f232178c5d5400ab90aad300d2f1d2d70909b98d
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 import arvados.util
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52
53 logger = logging.getLogger('arvados.cwl-runner')
54
55 def trim_anonymous_location(obj):
56     """Remove 'location' field from File and Directory literals.
57
58     To make internal handling easier, literals are assigned a random id for
59     'location'.  However, when writing the record back out, this can break
60     reproducibility.  Since it is valid for literals not have a 'location'
61     field, remove it.
62
63     """
64
65     if obj.get("location", "").startswith("_:"):
66         del obj["location"]
67
68
69 def remove_redundant_fields(obj):
70     for field in ("path", "nameext", "nameroot", "dirname"):
71         if field in obj:
72             del obj[field]
73
74
75 def find_defaults(d, op):
76     if isinstance(d, list):
77         for i in d:
78             find_defaults(i, op)
79     elif isinstance(d, dict):
80         if "default" in d:
81             op(d)
82         else:
83             for i in viewvalues(d):
84                 find_defaults(i, op)
85
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
87     return Builder(
88                  job=joborder,
89                  files=[],               # type: List[Dict[Text, Text]]
90                  bindings=[],            # type: List[Dict[Text, Any]]
91                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
92                  names=None,               # type: Names
93                  requirements=requirements,        # type: List[Dict[Text, Any]]
94                  hints=hints,               # type: List[Dict[Text, Any]]
95                  resources={},           # type: Dict[str, int]
96                  mutation_manager=None,    # type: Optional[MutationManager]
97                  formatgraph=None,         # type: Optional[Graph]
98                  make_fs_access=None,      # type: Type[StdFsAccess]
99                  fs_access=None,           # type: StdFsAccess
100                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101                  timeout=runtimeContext.eval_timeout,             # type: float
102                  debug=runtimeContext.debug,               # type: bool
103                  js_console=runtimeContext.js_console,          # type: bool
104                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
105                  loadListing="",         # type: Text
106                  outdir="",              # type: Text
107                  tmpdir="",              # type: Text
108                  stagedir="",            # type: Text
109                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
110                  container_engine="docker"
111                 )
112
113 def search_schemadef(name, reqs):
114     for r in reqs:
115         if r["class"] == "SchemaDefRequirement":
116             for sd in r["types"]:
117                 if sd["name"] == name:
118                     return sd
119     return None
120
121 primitive_types_set = frozenset(("null", "boolean", "int", "long",
122                                  "float", "double", "string", "record",
123                                  "array", "enum"))
124
125 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
126     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
127         # union type, collect all possible secondaryFiles
128         for i in inputschema:
129             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130         return
131
132     if inputschema == "File":
133         inputschema = {"type": "File"}
134
135     if isinstance(inputschema, basestring):
136         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
137         if sd:
138             inputschema = sd
139         else:
140             return
141
142     if "secondaryFiles" in inputschema:
143         # set secondaryFiles, may be inherited by compound types.
144         secondaryspec = inputschema["secondaryFiles"]
145
146     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
147         not isinstance(inputschema["type"], basestring)):
148         # compound type (union, array, record)
149         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
150
151     elif (inputschema["type"] == "record" and
152           isinstance(primary, Mapping)):
153         #
154         # record type, find secondary files associated with fields.
155         #
156         for f in inputschema["fields"]:
157             p = primary.get(shortname(f["name"]))
158             if p:
159                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
160
161     elif (inputschema["type"] == "array" and
162           isinstance(primary, Sequence)):
163         #
164         # array type, find secondary files of elements
165         #
166         for p in primary:
167             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
168
169     elif (inputschema["type"] == "File" and
170           isinstance(primary, Mapping) and
171           primary.get("class") == "File"):
172
173         if "secondaryFiles" in primary or not secondaryspec:
174             # Nothing to do.
175             return
176
177         #
178         # Found a file, check for secondaryFiles
179         #
180         specs = []
181         primary["secondaryFiles"] = secondaryspec
182         for i, sf in enumerate(aslist(secondaryspec)):
183             if builder.cwlVersion == "v1.0":
184                 pattern = sf
185             else:
186                 pattern = sf["pattern"]
187             if pattern is None:
188                 continue
189             if isinstance(pattern, list):
190                 specs.extend(pattern)
191             elif isinstance(pattern, dict):
192                 specs.append(pattern)
193             elif isinstance(pattern, str):
194                 if builder.cwlVersion == "v1.0":
195                     specs.append({"pattern": pattern, "required": True})
196                 else:
197                     specs.append({"pattern": pattern, "required": sf.get("required")})
198             else:
199                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
200                     "Expression must return list, object, string or null")
201
202         found = []
203         for i, sf in enumerate(specs):
204             if isinstance(sf, dict):
205                 if sf.get("class") == "File":
206                     pattern = None
207                     if sf.get("location") is None:
208                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
209                             "File object is missing 'location': %s" % sf)
210                     sfpath = sf["location"]
211                     required = True
212                 else:
213                     pattern = sf["pattern"]
214                     required = sf.get("required")
215             elif isinstance(sf, str):
216                 pattern = sf
217                 required = True
218             else:
219                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
220                     "Expression must return list, object, string or null")
221
222             if pattern is not None:
223                 if "${" in pattern or "$(" in pattern:
224                     sfname = builder.do_eval(pattern, context=primary)
225                 else:
226                     sfname = substitute(primary["basename"], pattern)
227
228                 if sfname is None:
229                     continue
230
231                 p_location = primary["location"]
232                 if "/" in p_location:
233                     sfpath = (
234                         p_location[0 : p_location.rindex("/") + 1]
235                         + sfname
236                     )
237
238             required = builder.do_eval(required, context=primary)
239
240             if fsaccess.exists(sfpath):
241                 if pattern is not None:
242                     found.append({"location": sfpath, "class": "File"})
243                 else:
244                     found.append(sf)
245             elif required:
246                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
247                     "Required secondary file '%s' does not exist" % sfpath)
248
249         primary["secondaryFiles"] = cmap(found)
250         if discovered is not None:
251             discovered[primary["location"]] = primary["secondaryFiles"]
252     elif inputschema["type"] not in primitive_types_set:
253         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
254
255 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
256     for inputschema in inputs:
257         primary = job_order.get(shortname(inputschema["id"]))
258         if isinstance(primary, (Mapping, Sequence)):
259             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
260
261 def upload_dependencies(arvrunner, name, document_loader,
262                         workflowobj, uri, loadref_run, runtimeContext,
263                         include_primary=True, discovered_secondaryfiles=None):
264     """Upload the dependencies of the workflowobj document to Keep.
265
266     Returns a pathmapper object mapping local paths to keep references.  Also
267     does an in-place update of references in "workflowobj".
268
269     Use scandeps to find $import, $include, $schemas, run, File and Directory
270     fields that represent external references.
271
272     If workflowobj has an "id" field, this will reload the document to ensure
273     it is scanning the raw document prior to preprocessing.
274     """
275
276     loaded = set()
277     def loadref(b, u):
278         joined = document_loader.fetcher.urljoin(b, u)
279         defrg, _ = urllib.parse.urldefrag(joined)
280         if defrg not in loaded:
281             loaded.add(defrg)
282             # Use fetch_text to get raw file (before preprocessing).
283             text = document_loader.fetch_text(defrg)
284             if isinstance(text, bytes):
285                 textIO = StringIO(text.decode('utf-8'))
286             else:
287                 textIO = StringIO(text)
288             yamlloader = YAML(typ='safe', pure=True)
289             return yamlloader.load(textIO)
290         else:
291             return {}
292
293     if loadref_run:
294         loadref_fields = set(("$import", "run"))
295     else:
296         loadref_fields = set(("$import",))
297
298     scanobj = workflowobj
299     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
300         # Need raw file content (before preprocessing) to ensure
301         # that external references in $include and $mixin are captured.
302         scanobj = loadref("", workflowobj["id"])
303
304     metadata = scanobj
305
306     sc_result = scandeps(uri, scanobj,
307                          loadref_fields,
308                          set(("$include", "location")),
309                          loadref, urljoin=document_loader.fetcher.urljoin,
310                          nestdirs=False)
311
312     optional_deps = scandeps(uri, scanobj,
313                                   loadref_fields,
314                                   set(("$schemas",)),
315                                   loadref, urljoin=document_loader.fetcher.urljoin,
316                                   nestdirs=False)
317
318     sc_result.extend(optional_deps)
319
320     sc = []
321     uuids = {}
322
323     def collect_uuids(obj):
324         loc = obj.get("location", "")
325         sp = loc.split(":")
326         if sp[0] == "keep":
327             # Collect collection uuids that need to be resolved to
328             # portable data hashes
329             gp = collection_uuid_pattern.match(loc)
330             if gp:
331                 uuids[gp.groups()[0]] = obj
332             if collectionUUID in obj:
333                 uuids[obj[collectionUUID]] = obj
334
335     def collect_uploads(obj):
336         loc = obj.get("location", "")
337         sp = loc.split(":")
338         if len(sp) < 1:
339             return
340         if sp[0] in ("file", "http", "https"):
341             # Record local files than need to be uploaded,
342             # don't include file literals, keep references, etc.
343             sc.append(obj)
344         collect_uuids(obj)
345
346     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
347     visit_class(sc_result, ("File", "Directory"), collect_uploads)
348
349     # Resolve any collection uuids we found to portable data hashes
350     # and assign them to uuid_map
351     uuid_map = {}
352     fetch_uuids = list(uuids.keys())
353     while fetch_uuids:
354         # For a large number of fetch_uuids, API server may limit
355         # response size, so keep fetching from API server has nothing
356         # more to give us.
357         lookups = arvrunner.api.collections().list(
358             filters=[["uuid", "in", fetch_uuids]],
359             count="none",
360             select=["uuid", "portable_data_hash"]).execute(
361                 num_retries=arvrunner.num_retries)
362
363         if not lookups["items"]:
364             break
365
366         for l in lookups["items"]:
367             uuid_map[l["uuid"]] = l["portable_data_hash"]
368
369         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
370
371     normalizeFilesDirs(sc)
372
373     if include_primary and "id" in workflowobj:
374         sc.append({"class": "File", "location": workflowobj["id"]})
375
376     def visit_default(obj):
377         def defaults_are_optional(f):
378             if "location" not in f and "path" in f:
379                 f["location"] = f["path"]
380                 del f["path"]
381             normalizeFilesDirs(f)
382             optional_deps.append(f)
383         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
384
385     find_defaults(workflowobj, visit_default)
386
387     discovered = {}
388     def discover_default_secondary_files(obj):
389         builder_job_order = {}
390         for t in obj["inputs"]:
391             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
392         # Need to create a builder object to evaluate expressions.
393         builder = make_builder(builder_job_order,
394                                obj.get("hints", []),
395                                obj.get("requirements", []),
396                                ArvRuntimeContext(),
397                                metadata)
398         discover_secondary_files(arvrunner.fs_access,
399                                  builder,
400                                  obj["inputs"],
401                                  builder_job_order,
402                                  discovered)
403
404     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
405     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
406
407     for d in list(discovered):
408         # Only interested in discovered secondaryFiles which are local
409         # files that need to be uploaded.
410         if d.startswith("file:"):
411             sc.extend(discovered[d])
412         else:
413             del discovered[d]
414
415     mapper = ArvPathMapper(arvrunner, sc, "",
416                            "keep:%s",
417                            "keep:%s/%s",
418                            name=name,
419                            single_collection=True,
420                            optional_deps=optional_deps)
421
422     keeprefs = set()
423     def addkeepref(k):
424         if k.startswith("keep:"):
425             keeprefs.add(collection_pdh_pattern.match(k).group(1))
426
427     def setloc(p):
428         loc = p.get("location")
429         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
430             p["location"] = mapper.mapper(p["location"]).resolved
431             addkeepref(p["location"])
432             return
433
434         if not loc:
435             return
436
437         if collectionUUID in p:
438             uuid = p[collectionUUID]
439             if uuid not in uuid_map:
440                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
441                     "Collection uuid %s not found" % uuid)
442             gp = collection_pdh_pattern.match(loc)
443             if gp and uuid_map[uuid] != gp.groups()[0]:
444                 # This file entry has both collectionUUID and a PDH
445                 # location. If the PDH doesn't match the one returned
446                 # the API server, raise an error.
447                 raise SourceLine(p, "location", validate.ValidationException).makeError(
448                     "Expected collection uuid %s to be %s but API server reported %s" % (
449                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
450
451         gp = collection_uuid_pattern.match(loc)
452         if not gp:
453             # Not a uuid pattern (must be a pdh pattern)
454             addkeepref(p["location"])
455             return
456
457         uuid = gp.groups()[0]
458         if uuid not in uuid_map:
459             raise SourceLine(p, "location", validate.ValidationException).makeError(
460                 "Collection uuid %s not found" % uuid)
461         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
462         p[collectionUUID] = uuid
463
464     visit_class(workflowobj, ("File", "Directory"), setloc)
465     visit_class(discovered, ("File", "Directory"), setloc)
466
467     if discovered_secondaryfiles is not None:
468         for d in discovered:
469             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
470
471     if runtimeContext.copy_deps:
472         # Find referenced collections and copy them into the
473         # destination project, for easy sharing.
474         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
475                                      filters=[["portable_data_hash", "in", list(keeprefs)],
476                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
477                                      select=["uuid", "portable_data_hash", "created_at"]))
478
479         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
480         for kr in keeprefs:
481             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
482                                                   order="created_at desc",
483                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
484                                                    limit=1).execute()
485             if len(col["items"]) == 0:
486                 logger.warning("Cannot find collection with portable data hash %s", kr)
487                 continue
488             col = col["items"][0]
489             try:
490                 arvrunner.api.collections().create(body={"collection": {
491                     "owner_uuid": runtimeContext.project_uuid,
492                     "name": col["name"],
493                     "description": col["description"],
494                     "properties": col["properties"],
495                     "portable_data_hash": col["portable_data_hash"],
496                     "manifest_text": col["manifest_text"],
497                     "storage_classes_desired": col["storage_classes_desired"],
498                     "trash_at": col["trash_at"]
499                 }}, ensure_unique_name=True).execute()
500             except Exception as e:
501                 logger.warning("Unable copy collection to destination: %s", e)
502
503     if "$schemas" in workflowobj:
504         sch = CommentedSeq()
505         for s in workflowobj["$schemas"]:
506             if s in mapper:
507                 sch.append(mapper.mapper(s).resolved)
508         workflowobj["$schemas"] = sch
509
510     return mapper
511
512
513 def upload_docker(arvrunner, tool, runtimeContext):
514     """Uploads Docker images used in CommandLineTool objects."""
515
516     if isinstance(tool, CommandLineTool):
517         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
518         if docker_req:
519             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
520                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
521                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
522
523             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
524                                                        runtimeContext.project_uuid,
525                                                        runtimeContext.force_docker_pull,
526                                                        runtimeContext.tmp_outdir_prefix,
527                                                        runtimeContext.match_local_docker,
528                                                        runtimeContext.copy_deps)
529         else:
530             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
531                                                        True,
532                                                        runtimeContext.project_uuid,
533                                                        runtimeContext.force_docker_pull,
534                                                        runtimeContext.tmp_outdir_prefix,
535                                                        runtimeContext.match_local_docker,
536                                                        runtimeContext.copy_deps)
537     elif isinstance(tool, cwltool.workflow.Workflow):
538         for s in tool.steps:
539             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
540
541
542 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
543     """Create a packed workflow.
544
545     A "packed" workflow is one where all the components have been combined into a single document."""
546
547     rewrites = {}
548     packed = pack(arvrunner.loadingContext, tool.tool["id"],
549                   rewrite_out=rewrites,
550                   loader=tool.doc_loader)
551
552     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
553
554     def visit(v, cur_id):
555         if isinstance(v, dict):
556             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
557                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
558                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
559                 if "id" in v:
560                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
561             if "path" in v and "location" not in v:
562                 v["location"] = v["path"]
563                 del v["path"]
564             if "location" in v and cur_id in merged_map:
565                 if v["location"] in merged_map[cur_id].resolved:
566                     v["location"] = merged_map[cur_id].resolved[v["location"]]
567                 if v["location"] in merged_map[cur_id].secondaryFiles:
568                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
569             if v.get("class") == "DockerRequirement":
570                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
571                                                                                                              runtimeContext.project_uuid,
572                                                                                                              runtimeContext.force_docker_pull,
573                                                                                                              runtimeContext.tmp_outdir_prefix,
574                                                                                                              runtimeContext.match_local_docker,
575                                                                                                              runtimeContext.copy_deps)
576             for l in v:
577                 visit(v[l], cur_id)
578         if isinstance(v, list):
579             for l in v:
580                 visit(l, cur_id)
581     visit(packed, None)
582     return packed
583
584
585 def tag_git_version(packed):
586     if tool.tool["id"].startswith("file://"):
587         path = os.path.dirname(tool.tool["id"][7:])
588         try:
589             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
590         except (OSError, subprocess.CalledProcessError):
591             pass
592         else:
593             packed["http://schema.org/version"] = githash
594
595
596 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
597     """Upload local files referenced in the input object and return updated input
598     object with 'location' updated to the proper keep references.
599     """
600
601     # Make a copy of the job order and set defaults.
602     builder_job_order = copy.copy(job_order)
603
604     # fill_in_defaults throws an error if there are any
605     # missing required parameters, we don't want it to do that
606     # so make them all optional.
607     inputs_copy = copy.deepcopy(tool.tool["inputs"])
608     for i in inputs_copy:
609         if "null" not in i["type"]:
610             i["type"] = ["null"] + aslist(i["type"])
611
612     fill_in_defaults(inputs_copy,
613                      builder_job_order,
614                      arvrunner.fs_access)
615     # Need to create a builder object to evaluate expressions.
616     builder = make_builder(builder_job_order,
617                            tool.hints,
618                            tool.requirements,
619                            ArvRuntimeContext(),
620                            tool.metadata)
621     # Now update job_order with secondaryFiles
622     discover_secondary_files(arvrunner.fs_access,
623                              builder,
624                              tool.tool["inputs"],
625                              job_order)
626
627     jobmapper = upload_dependencies(arvrunner,
628                                     name,
629                                     tool.doc_loader,
630                                     job_order,
631                                     job_order.get("id", "#"),
632                                     False,
633                                     runtimeContext)
634
635     if "id" in job_order:
636         del job_order["id"]
637
638     # Need to filter this out, gets added by cwltool when providing
639     # parameters on the command line.
640     if "job_order" in job_order:
641         del job_order["job_order"]
642
643     return job_order
644
645 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
646
647 def upload_workflow_deps(arvrunner, tool, runtimeContext):
648     # Ensure that Docker images needed by this workflow are available
649
650     upload_docker(arvrunner, tool, runtimeContext)
651
652     document_loader = tool.doc_loader
653
654     merged_map = {}
655
656     def upload_tool_deps(deptool):
657         if "id" in deptool:
658             discovered_secondaryfiles = {}
659             pm = upload_dependencies(arvrunner,
660                                      "%s dependencies" % (shortname(deptool["id"])),
661                                      document_loader,
662                                      deptool,
663                                      deptool["id"],
664                                      False,
665                                      runtimeContext,
666                                      include_primary=False,
667                                      discovered_secondaryfiles=discovered_secondaryfiles)
668             document_loader.idx[deptool["id"]] = deptool
669             toolmap = {}
670             for k,v in pm.items():
671                 toolmap[k] = v.resolved
672             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
673
674     tool.visit(upload_tool_deps)
675
676     return merged_map
677
678 def arvados_jobs_image(arvrunner, img, runtimeContext):
679     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
680
681     try:
682         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
683                                                           True,
684                                                           runtimeContext.project_uuid,
685                                                           runtimeContext.force_docker_pull,
686                                                           runtimeContext.tmp_outdir_prefix,
687                                                           runtimeContext.match_local_docker,
688                                                           runtimeContext.copy_deps)
689     except Exception as e:
690         raise Exception("Docker image %s is not available\n%s" % (img, e) )
691
692
693 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
694     collection = arvados.collection.Collection(api_client=arvrunner.api,
695                                                keep_client=arvrunner.keep_client,
696                                                num_retries=arvrunner.num_retries)
697     with collection.open("workflow.cwl", "w") as f:
698         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
699
700     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
701                ["name", "like", name+"%"]]
702     if runtimeContext.project_uuid:
703         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
704     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
705
706     if exists["items"]:
707         logger.info("Using collection %s", exists["items"][0]["uuid"])
708     else:
709         collection.save_new(name=name,
710                             owner_uuid=runtimeContext.project_uuid,
711                             ensure_unique_name=True,
712                             num_retries=arvrunner.num_retries)
713         logger.info("Uploaded to %s", collection.manifest_locator())
714
715     return collection.portable_data_hash()
716
717
718 class Runner(Process):
719     """Base class for runner processes, which submit an instance of
720     arvados-cwl-runner and wait for the final result."""
721
722     def __init__(self, runner, updated_tool,
723                  tool, loadingContext, enable_reuse,
724                  output_name, output_tags, submit_runner_ram=0,
725                  name=None, on_error=None, submit_runner_image=None,
726                  intermediate_output_ttl=0, merged_map=None,
727                  priority=None, secret_store=None,
728                  collection_cache_size=256,
729                  collection_cache_is_default=True):
730
731         loadingContext = loadingContext.copy()
732         loadingContext.metadata = updated_tool.metadata.copy()
733
734         super(Runner, self).__init__(updated_tool.tool, loadingContext)
735
736         self.arvrunner = runner
737         self.embedded_tool = tool
738         self.job_order = None
739         self.running = False
740         if enable_reuse:
741             # If reuse is permitted by command line arguments but
742             # disabled by the workflow itself, disable it.
743             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
744             if reuse_req:
745                 enable_reuse = reuse_req["enableReuse"]
746         self.enable_reuse = enable_reuse
747         self.uuid = None
748         self.final_output = None
749         self.output_name = output_name
750         self.output_tags = output_tags
751         self.name = name
752         self.on_error = on_error
753         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
754         self.intermediate_output_ttl = intermediate_output_ttl
755         self.priority = priority
756         self.secret_store = secret_store
757         self.enable_dev = loadingContext.enable_dev
758
759         self.submit_runner_cores = 1
760         self.submit_runner_ram = 1024  # defaut 1 GiB
761         self.collection_cache_size = collection_cache_size
762
763         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
764         if runner_resource_req:
765             if runner_resource_req.get("coresMin"):
766                 self.submit_runner_cores = runner_resource_req["coresMin"]
767             if runner_resource_req.get("ramMin"):
768                 self.submit_runner_ram = runner_resource_req["ramMin"]
769             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
770                 self.collection_cache_size = runner_resource_req["keep_cache"]
771
772         if submit_runner_ram:
773             # Command line / initializer overrides default and/or spec from workflow
774             self.submit_runner_ram = submit_runner_ram
775
776         if self.submit_runner_ram <= 0:
777             raise Exception("Value of submit-runner-ram must be greater than zero")
778
779         if self.submit_runner_cores <= 0:
780             raise Exception("Value of submit-runner-cores must be greater than zero")
781
782         self.merged_map = merged_map or {}
783
784     def job(self,
785             job_order,         # type: Mapping[Text, Text]
786             output_callbacks,  # type: Callable[[Any, Any], Any]
787             runtimeContext     # type: RuntimeContext
788            ):  # type: (...) -> Generator[Any, None, None]
789         self.job_order = job_order
790         self._init_job(job_order, runtimeContext)
791         yield self
792
793     def update_pipeline_component(self, record):
794         pass
795
796     def done(self, record):
797         """Base method for handling a completed runner."""
798
799         try:
800             if record["state"] == "Complete":
801                 if record.get("exit_code") is not None:
802                     if record["exit_code"] == 33:
803                         processStatus = "UnsupportedRequirement"
804                     elif record["exit_code"] == 0:
805                         processStatus = "success"
806                     else:
807                         processStatus = "permanentFail"
808                 else:
809                     processStatus = "success"
810             else:
811                 processStatus = "permanentFail"
812
813             outputs = {}
814
815             if processStatus == "permanentFail":
816                 logc = arvados.collection.CollectionReader(record["log"],
817                                                            api_client=self.arvrunner.api,
818                                                            keep_client=self.arvrunner.keep_client,
819                                                            num_retries=self.arvrunner.num_retries)
820                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
821
822             self.final_output = record["output"]
823             outc = arvados.collection.CollectionReader(self.final_output,
824                                                        api_client=self.arvrunner.api,
825                                                        keep_client=self.arvrunner.keep_client,
826                                                        num_retries=self.arvrunner.num_retries)
827             if "cwl.output.json" in outc:
828                 with outc.open("cwl.output.json", "rb") as f:
829                     if f.size() > 0:
830                         outputs = json.loads(f.read().decode())
831             def keepify(fileobj):
832                 path = fileobj["location"]
833                 if not path.startswith("keep:"):
834                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
835             adjustFileObjs(outputs, keepify)
836             adjustDirObjs(outputs, keepify)
837         except Exception:
838             logger.exception("[%s] While getting final output object", self.name)
839             self.arvrunner.output_callback({}, "permanentFail")
840         else:
841             self.arvrunner.output_callback(outputs, processStatus)