19280: Only scan tools
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import (
21     Any,
22     Callable,
23     Dict,
24     Iterable,
25     Iterator,
26     List,
27     Mapping,
28     MutableMapping,
29     Sequence,
30     MutableSequence,
31     Optional,
32     Set,
33     Sized,
34     Tuple,
35     Type,
36     Union,
37     cast,
38 )
39 from cwltool.utils import (
40     CWLObjectType,
41     CWLOutputAtomType,
42     CWLOutputType,
43 )
44
45 if os.name == "posix" and sys.version_info[0] < 3:
46     import subprocess32 as subprocess
47 else:
48     import subprocess
49
50 from schema_salad.sourceline import SourceLine, cmap
51
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55                              shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63
64 import arvados.collection
65 import arvados.util
66 from .util import collectionUUID
67 from ruamel.yaml import YAML
68 from ruamel.yaml.comments import CommentedMap, CommentedSeq
69
70 import arvados_cwl.arvdocker
71 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
72 from ._version import __version__
73 from . import done
74 from . context import ArvRuntimeContext
75 from .perf import Perf
76
77 logger = logging.getLogger('arvados.cwl-runner')
78 metrics = logging.getLogger('arvados.cwl-runner.metrics')
79
80 def trim_anonymous_location(obj):
81     """Remove 'location' field from File and Directory literals.
82
83     To make internal handling easier, literals are assigned a random id for
84     'location'.  However, when writing the record back out, this can break
85     reproducibility.  Since it is valid for literals not have a 'location'
86     field, remove it.
87
88     """
89
90     if obj.get("location", "").startswith("_:"):
91         del obj["location"]
92
93
94 def remove_redundant_fields(obj):
95     for field in ("path", "nameext", "nameroot", "dirname"):
96         if field in obj:
97             del obj[field]
98
99
100 def find_defaults(d, op):
101     if isinstance(d, list):
102         for i in d:
103             find_defaults(i, op)
104     elif isinstance(d, dict):
105         if "default" in d:
106             op(d)
107         else:
108             for i in viewvalues(d):
109                 find_defaults(i, op)
110
111 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
112     return Builder(
113                  job=joborder,
114                  files=[],               # type: List[Dict[Text, Text]]
115                  bindings=[],            # type: List[Dict[Text, Any]]
116                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
117                  names=None,               # type: Names
118                  requirements=requirements,        # type: List[Dict[Text, Any]]
119                  hints=hints,               # type: List[Dict[Text, Any]]
120                  resources={},           # type: Dict[str, int]
121                  mutation_manager=None,    # type: Optional[MutationManager]
122                  formatgraph=None,         # type: Optional[Graph]
123                  make_fs_access=None,      # type: Type[StdFsAccess]
124                  fs_access=None,           # type: StdFsAccess
125                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
126                  timeout=runtimeContext.eval_timeout,             # type: float
127                  debug=runtimeContext.debug,               # type: bool
128                  js_console=runtimeContext.js_console,          # type: bool
129                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
130                  loadListing="",         # type: Text
131                  outdir="",              # type: Text
132                  tmpdir="",              # type: Text
133                  stagedir="",            # type: Text
134                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
135                  container_engine="docker"
136                 )
137
138 def search_schemadef(name, reqs):
139     for r in reqs:
140         if r["class"] == "SchemaDefRequirement":
141             for sd in r["types"]:
142                 if sd["name"] == name:
143                     return sd
144     return None
145
146 primitive_types_set = frozenset(("null", "boolean", "int", "long",
147                                  "float", "double", "string", "record",
148                                  "array", "enum"))
149
150 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
151     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
152         # union type, collect all possible secondaryFiles
153         for i in inputschema:
154             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
155         return
156
157     if inputschema == "File":
158         inputschema = {"type": "File"}
159
160     if isinstance(inputschema, basestring):
161         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
162         if sd:
163             inputschema = sd
164         else:
165             return
166
167     if "secondaryFiles" in inputschema:
168         # set secondaryFiles, may be inherited by compound types.
169         secondaryspec = inputschema["secondaryFiles"]
170
171     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
172         not isinstance(inputschema["type"], basestring)):
173         # compound type (union, array, record)
174         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
175
176     elif (inputschema["type"] == "record" and
177           isinstance(primary, Mapping)):
178         #
179         # record type, find secondary files associated with fields.
180         #
181         for f in inputschema["fields"]:
182             p = primary.get(shortname(f["name"]))
183             if p:
184                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
185
186     elif (inputschema["type"] == "array" and
187           isinstance(primary, Sequence)):
188         #
189         # array type, find secondary files of elements
190         #
191         for p in primary:
192             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
193
194     elif (inputschema["type"] == "File" and
195           isinstance(primary, Mapping) and
196           primary.get("class") == "File"):
197
198         if "secondaryFiles" in primary or not secondaryspec:
199             # Nothing to do.
200             return
201
202         #
203         # Found a file, check for secondaryFiles
204         #
205         specs = []
206         primary["secondaryFiles"] = secondaryspec
207         for i, sf in enumerate(aslist(secondaryspec)):
208             if builder.cwlVersion == "v1.0":
209                 pattern = sf
210             else:
211                 pattern = sf["pattern"]
212             if pattern is None:
213                 continue
214             if isinstance(pattern, list):
215                 specs.extend(pattern)
216             elif isinstance(pattern, dict):
217                 specs.append(pattern)
218             elif isinstance(pattern, str):
219                 if builder.cwlVersion == "v1.0":
220                     specs.append({"pattern": pattern, "required": True})
221                 else:
222                     specs.append({"pattern": pattern, "required": sf.get("required")})
223             else:
224                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
225                     "Expression must return list, object, string or null")
226
227         found = []
228         for i, sf in enumerate(specs):
229             if isinstance(sf, dict):
230                 if sf.get("class") == "File":
231                     pattern = None
232                     if sf.get("location") is None:
233                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
234                             "File object is missing 'location': %s" % sf)
235                     sfpath = sf["location"]
236                     required = True
237                 else:
238                     pattern = sf["pattern"]
239                     required = sf.get("required")
240             elif isinstance(sf, str):
241                 pattern = sf
242                 required = True
243             else:
244                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
245                     "Expression must return list, object, string or null")
246
247             if pattern is not None:
248                 if "${" in pattern or "$(" in pattern:
249                     sfname = builder.do_eval(pattern, context=primary)
250                 else:
251                     sfname = substitute(primary["basename"], pattern)
252
253                 if sfname is None:
254                     continue
255
256                 p_location = primary["location"]
257                 if "/" in p_location:
258                     sfpath = (
259                         p_location[0 : p_location.rindex("/") + 1]
260                         + sfname
261                     )
262
263             required = builder.do_eval(required, context=primary)
264
265             if fsaccess.exists(sfpath):
266                 if pattern is not None:
267                     found.append({"location": sfpath, "class": "File"})
268                 else:
269                     found.append(sf)
270             elif required:
271                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272                     "Required secondary file '%s' does not exist" % sfpath)
273
274         primary["secondaryFiles"] = cmap(found)
275         if discovered is not None:
276             discovered[primary["location"]] = primary["secondaryFiles"]
277     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
278         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
279
280 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
281     for inputschema in inputs:
282         primary = job_order.get(shortname(inputschema["id"]))
283         if isinstance(primary, (Mapping, Sequence)):
284             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
285
286 def upload_dependencies(arvrunner, name, document_loader,
287                         workflowobj, uri, loadref_run, runtimeContext,
288                         include_primary=True, discovered_secondaryfiles=None,
289                         cache=None):
290     """Upload the dependencies of the workflowobj document to Keep.
291
292     Returns a pathmapper object mapping local paths to keep references.  Also
293     does an in-place update of references in "workflowobj".
294
295     Use scandeps to find $import, $include, $schemas, run, File and Directory
296     fields that represent external references.
297
298     If workflowobj has an "id" field, this will reload the document to ensure
299     it is scanning the raw document prior to preprocessing.
300     """
301
302     loaded = set()
303     def loadref(b, u):
304         joined = document_loader.fetcher.urljoin(b, u)
305         defrg, _ = urllib.parse.urldefrag(joined)
306         if defrg not in loaded:
307             loaded.add(defrg)
308             if cache is not None and defrg in cache:
309                 return cache[defrg]
310             # Use fetch_text to get raw file (before preprocessing).
311             text = document_loader.fetch_text(defrg)
312             if isinstance(text, bytes):
313                 textIO = StringIO(text.decode('utf-8'))
314             else:
315                 textIO = StringIO(text)
316             yamlloader = YAML(typ='safe', pure=True)
317             result = yamlloader.load(textIO)
318             if cache is not None:
319                 cache[defrg] = result
320             return result
321         else:
322             return {}
323
324     if loadref_run:
325         loadref_fields = set(("$import", "run"))
326     else:
327         loadref_fields = set(("$import",))
328
329     scanobj = workflowobj
330     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
331         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
332         if cache is not None and defrg in cache:
333             # if we haven't seen this file before, want raw file
334             # content (before preprocessing) to ensure that external
335             # references like $include haven't already been inlined.
336             scanobj = loadref("", workflowobj["id"])
337
338     metadata = scanobj
339
340     with Perf(metrics, "scandeps include, location"):
341         sc_result = scandeps(uri, scanobj,
342                              loadref_fields,
343                              set(("$include", "location")),
344                              loadref, urljoin=document_loader.fetcher.urljoin,
345                              nestdirs=False)
346
347     with Perf(metrics, "scandeps $schemas"):
348         optional_deps = scandeps(uri, scanobj,
349                                       loadref_fields,
350                                       set(("$schemas",)),
351                                       loadref, urljoin=document_loader.fetcher.urljoin,
352                                       nestdirs=False)
353
354     if sc_result is None:
355         sc_result = []
356
357     if optional_deps is None:
358         optional_deps = []
359
360     if optional_deps:
361         sc_result.extend(optional_deps)
362
363     sc = []
364     uuids = {}
365
366     def collect_uuids(obj):
367         loc = obj.get("location", "")
368         sp = loc.split(":")
369         if sp[0] == "keep":
370             # Collect collection uuids that need to be resolved to
371             # portable data hashes
372             gp = collection_uuid_pattern.match(loc)
373             if gp:
374                 uuids[gp.groups()[0]] = obj
375             if collectionUUID in obj:
376                 uuids[obj[collectionUUID]] = obj
377
378     def collect_uploads(obj):
379         loc = obj.get("location", "")
380         sp = loc.split(":")
381         if len(sp) < 1:
382             return
383         if sp[0] in ("file", "http", "https"):
384             # Record local files than need to be uploaded,
385             # don't include file literals, keep references, etc.
386             sc.append(obj)
387         collect_uuids(obj)
388
389     with Perf(metrics, "collect uuids"):
390         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
391
392     with Perf(metrics, "collect uploads"):
393         visit_class(sc_result, ("File", "Directory"), collect_uploads)
394
395     # Resolve any collection uuids we found to portable data hashes
396     # and assign them to uuid_map
397     uuid_map = {}
398     fetch_uuids = list(uuids.keys())
399     with Perf(metrics, "fetch_uuids"):
400         while fetch_uuids:
401             # For a large number of fetch_uuids, API server may limit
402             # response size, so keep fetching from API server has nothing
403             # more to give us.
404             lookups = arvrunner.api.collections().list(
405                 filters=[["uuid", "in", fetch_uuids]],
406                 count="none",
407                 select=["uuid", "portable_data_hash"]).execute(
408                     num_retries=arvrunner.num_retries)
409
410             if not lookups["items"]:
411                 break
412
413             for l in lookups["items"]:
414                 uuid_map[l["uuid"]] = l["portable_data_hash"]
415
416             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
417
418     normalizeFilesDirs(sc)
419
420     if include_primary and "id" in workflowobj:
421         sc.append({"class": "File", "location": workflowobj["id"]})
422
423     def visit_default(obj):
424         def defaults_are_optional(f):
425             if "location" not in f and "path" in f:
426                 f["location"] = f["path"]
427                 del f["path"]
428             normalizeFilesDirs(f)
429             optional_deps.append(f)
430         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
431
432     find_defaults(workflowobj, visit_default)
433
434     discovered = {}
435     def discover_default_secondary_files(obj):
436         builder_job_order = {}
437         for t in obj["inputs"]:
438             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
439         # Need to create a builder object to evaluate expressions.
440         builder = make_builder(builder_job_order,
441                                obj.get("hints", []),
442                                obj.get("requirements", []),
443                                ArvRuntimeContext(),
444                                metadata)
445         discover_secondary_files(arvrunner.fs_access,
446                                  builder,
447                                  obj["inputs"],
448                                  builder_job_order,
449                                  discovered)
450
451     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
452     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
453
454     for d in list(discovered):
455         # Only interested in discovered secondaryFiles which are local
456         # files that need to be uploaded.
457         if d.startswith("file:"):
458             sc.extend(discovered[d])
459         else:
460             del discovered[d]
461
462     with Perf(metrics, "mapper"):
463         mapper = ArvPathMapper(arvrunner, sc, "",
464                                "keep:%s",
465                                "keep:%s/%s",
466                                name=name,
467                                single_collection=True,
468                                optional_deps=optional_deps)
469
470     keeprefs = set()
471     def addkeepref(k):
472         if k.startswith("keep:"):
473             keeprefs.add(collection_pdh_pattern.match(k).group(1))
474
475     def setloc(p):
476         loc = p.get("location")
477         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
478             p["location"] = mapper.mapper(p["location"]).resolved
479             addkeepref(p["location"])
480             return
481
482         if not loc:
483             return
484
485         if collectionUUID in p:
486             uuid = p[collectionUUID]
487             if uuid not in uuid_map:
488                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
489                     "Collection uuid %s not found" % uuid)
490             gp = collection_pdh_pattern.match(loc)
491             if gp and uuid_map[uuid] != gp.groups()[0]:
492                 # This file entry has both collectionUUID and a PDH
493                 # location. If the PDH doesn't match the one returned
494                 # the API server, raise an error.
495                 raise SourceLine(p, "location", validate.ValidationException).makeError(
496                     "Expected collection uuid %s to be %s but API server reported %s" % (
497                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
498
499         gp = collection_uuid_pattern.match(loc)
500         if not gp:
501             # Not a uuid pattern (must be a pdh pattern)
502             addkeepref(p["location"])
503             return
504
505         uuid = gp.groups()[0]
506         if uuid not in uuid_map:
507             raise SourceLine(p, "location", validate.ValidationException).makeError(
508                 "Collection uuid %s not found" % uuid)
509         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
510         p[collectionUUID] = uuid
511
512     with Perf(metrics, "setloc"):
513         visit_class(workflowobj, ("File", "Directory"), setloc)
514         visit_class(discovered, ("File", "Directory"), setloc)
515
516     if discovered_secondaryfiles is not None:
517         for d in discovered:
518             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
519
520     if runtimeContext.copy_deps:
521         # Find referenced collections and copy them into the
522         # destination project, for easy sharing.
523         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
524                                      filters=[["portable_data_hash", "in", list(keeprefs)],
525                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
526                                      select=["uuid", "portable_data_hash", "created_at"]))
527
528         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
529         for kr in keeprefs:
530             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
531                                                   order="created_at desc",
532                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
533                                                    limit=1).execute()
534             if len(col["items"]) == 0:
535                 logger.warning("Cannot find collection with portable data hash %s", kr)
536                 continue
537             col = col["items"][0]
538             try:
539                 arvrunner.api.collections().create(body={"collection": {
540                     "owner_uuid": runtimeContext.project_uuid,
541                     "name": col["name"],
542                     "description": col["description"],
543                     "properties": col["properties"],
544                     "portable_data_hash": col["portable_data_hash"],
545                     "manifest_text": col["manifest_text"],
546                     "storage_classes_desired": col["storage_classes_desired"],
547                     "trash_at": col["trash_at"]
548                 }}, ensure_unique_name=True).execute()
549             except Exception as e:
550                 logger.warning("Unable copy collection to destination: %s", e)
551
552     if "$schemas" in workflowobj:
553         sch = CommentedSeq()
554         for s in workflowobj["$schemas"]:
555             if s in mapper:
556                 sch.append(mapper.mapper(s).resolved)
557         workflowobj["$schemas"] = sch
558
559     return mapper
560
561
562 def upload_docker(arvrunner, tool, runtimeContext):
563     """Uploads Docker images used in CommandLineTool objects."""
564
565     if isinstance(tool, CommandLineTool):
566         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
567         if docker_req:
568             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
569                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
570                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
571
572             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
573                                                        runtimeContext.project_uuid,
574                                                        runtimeContext.force_docker_pull,
575                                                        runtimeContext.tmp_outdir_prefix,
576                                                        runtimeContext.match_local_docker,
577                                                        runtimeContext.copy_deps)
578         else:
579             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
580                                                        True,
581                                                        runtimeContext.project_uuid,
582                                                        runtimeContext.force_docker_pull,
583                                                        runtimeContext.tmp_outdir_prefix,
584                                                        runtimeContext.match_local_docker,
585                                                        runtimeContext.copy_deps)
586     elif isinstance(tool, cwltool.workflow.Workflow):
587         for s in tool.steps:
588             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
589
590
591 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
592     """Create a packed workflow.
593
594     A "packed" workflow is one where all the components have been combined into a single document."""
595
596     rewrites = {}
597     packed = pack(arvrunner.loadingContext, tool.tool["id"],
598                   rewrite_out=rewrites,
599                   loader=tool.doc_loader)
600
601     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
602
603     def visit(v, cur_id):
604         if isinstance(v, dict):
605             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
606                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
607                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
608                 if "id" in v:
609                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
610             if "path" in v and "location" not in v:
611                 v["location"] = v["path"]
612                 del v["path"]
613             if "location" in v and cur_id in merged_map:
614                 if v["location"] in merged_map[cur_id].resolved:
615                     v["location"] = merged_map[cur_id].resolved[v["location"]]
616                 if v["location"] in merged_map[cur_id].secondaryFiles:
617                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
618             if v.get("class") == "DockerRequirement":
619                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
620                                                                                                              runtimeContext.project_uuid,
621                                                                                                              runtimeContext.force_docker_pull,
622                                                                                                              runtimeContext.tmp_outdir_prefix,
623                                                                                                              runtimeContext.match_local_docker,
624                                                                                                              runtimeContext.copy_deps)
625             for l in v:
626                 visit(v[l], cur_id)
627         if isinstance(v, list):
628             for l in v:
629                 visit(l, cur_id)
630     visit(packed, None)
631     return packed
632
633
634 def tag_git_version(packed):
635     if tool.tool["id"].startswith("file://"):
636         path = os.path.dirname(tool.tool["id"][7:])
637         try:
638             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
639         except (OSError, subprocess.CalledProcessError):
640             pass
641         else:
642             packed["http://schema.org/version"] = githash
643
644
645 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
646     """Upload local files referenced in the input object and return updated input
647     object with 'location' updated to the proper keep references.
648     """
649
650     # Make a copy of the job order and set defaults.
651     builder_job_order = copy.copy(job_order)
652
653     # fill_in_defaults throws an error if there are any
654     # missing required parameters, we don't want it to do that
655     # so make them all optional.
656     inputs_copy = copy.deepcopy(tool.tool["inputs"])
657     for i in inputs_copy:
658         if "null" not in i["type"]:
659             i["type"] = ["null"] + aslist(i["type"])
660
661     fill_in_defaults(inputs_copy,
662                      builder_job_order,
663                      arvrunner.fs_access)
664     # Need to create a builder object to evaluate expressions.
665     builder = make_builder(builder_job_order,
666                            tool.hints,
667                            tool.requirements,
668                            ArvRuntimeContext(),
669                            tool.metadata)
670     # Now update job_order with secondaryFiles
671     discover_secondary_files(arvrunner.fs_access,
672                              builder,
673                              tool.tool["inputs"],
674                              job_order)
675
676     jobmapper = upload_dependencies(arvrunner,
677                                     name,
678                                     tool.doc_loader,
679                                     job_order,
680                                     job_order.get("id", "#"),
681                                     False,
682                                     runtimeContext)
683
684     if "id" in job_order:
685         del job_order["id"]
686
687     # Need to filter this out, gets added by cwltool when providing
688     # parameters on the command line.
689     if "job_order" in job_order:
690         del job_order["job_order"]
691
692     return job_order
693
694 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
695
696 def upload_workflow_deps(arvrunner, tool, runtimeContext):
697     # Ensure that Docker images needed by this workflow are available
698
699     with Perf(metrics, "upload_docker"):
700         upload_docker(arvrunner, tool, runtimeContext)
701
702     document_loader = tool.doc_loader
703
704     merged_map = {}
705     tool_dep_cache = {}
706     def upload_tool_deps(deptool):
707         if "id" in deptool:
708             discovered_secondaryfiles = {}
709             with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
710                 pm = upload_dependencies(arvrunner,
711                                          "%s dependencies" % (shortname(deptool["id"])),
712                                          document_loader,
713                                          deptool,
714                                          deptool["id"],
715                                          False,
716                                          runtimeContext,
717                                          include_primary=False,
718                                          discovered_secondaryfiles=discovered_secondaryfiles,
719                                          cache=tool_dep_cache)
720             document_loader.idx[deptool["id"]] = deptool
721             toolmap = {}
722             for k,v in pm.items():
723                 toolmap[k] = v.resolved
724             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
725
726     tool.visit(upload_tool_deps)
727
728     return merged_map
729
730 def arvados_jobs_image(arvrunner, img, runtimeContext):
731     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
732
733     try:
734         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
735                                                           True,
736                                                           runtimeContext.project_uuid,
737                                                           runtimeContext.force_docker_pull,
738                                                           runtimeContext.tmp_outdir_prefix,
739                                                           runtimeContext.match_local_docker,
740                                                           runtimeContext.copy_deps)
741     except Exception as e:
742         raise Exception("Docker image %s is not available\n%s" % (img, e) )
743
744
745 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
746     collection = arvados.collection.Collection(api_client=arvrunner.api,
747                                                keep_client=arvrunner.keep_client,
748                                                num_retries=arvrunner.num_retries)
749     with collection.open("workflow.cwl", "w") as f:
750         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
751
752     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
753                ["name", "like", name+"%"]]
754     if runtimeContext.project_uuid:
755         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
756     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
757
758     if exists["items"]:
759         logger.info("Using collection %s", exists["items"][0]["uuid"])
760     else:
761         collection.save_new(name=name,
762                             owner_uuid=runtimeContext.project_uuid,
763                             ensure_unique_name=True,
764                             num_retries=arvrunner.num_retries)
765         logger.info("Uploaded to %s", collection.manifest_locator())
766
767     return collection.portable_data_hash()
768
769
770 class Runner(Process):
771     """Base class for runner processes, which submit an instance of
772     arvados-cwl-runner and wait for the final result."""
773
774     def __init__(self, runner, updated_tool,
775                  tool, loadingContext, enable_reuse,
776                  output_name, output_tags, submit_runner_ram=0,
777                  name=None, on_error=None, submit_runner_image=None,
778                  intermediate_output_ttl=0, merged_map=None,
779                  priority=None, secret_store=None,
780                  collection_cache_size=256,
781                  collection_cache_is_default=True):
782
783         loadingContext = loadingContext.copy()
784         loadingContext.metadata = updated_tool.metadata.copy()
785
786         super(Runner, self).__init__(updated_tool.tool, loadingContext)
787
788         self.arvrunner = runner
789         self.embedded_tool = tool
790         self.job_order = None
791         self.running = False
792         if enable_reuse:
793             # If reuse is permitted by command line arguments but
794             # disabled by the workflow itself, disable it.
795             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
796             if reuse_req:
797                 enable_reuse = reuse_req["enableReuse"]
798         self.enable_reuse = enable_reuse
799         self.uuid = None
800         self.final_output = None
801         self.output_name = output_name
802         self.output_tags = output_tags
803         self.name = name
804         self.on_error = on_error
805         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
806         self.intermediate_output_ttl = intermediate_output_ttl
807         self.priority = priority
808         self.secret_store = secret_store
809         self.enable_dev = loadingContext.enable_dev
810
811         self.submit_runner_cores = 1
812         self.submit_runner_ram = 1024  # defaut 1 GiB
813         self.collection_cache_size = collection_cache_size
814
815         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
816         if runner_resource_req:
817             if runner_resource_req.get("coresMin"):
818                 self.submit_runner_cores = runner_resource_req["coresMin"]
819             if runner_resource_req.get("ramMin"):
820                 self.submit_runner_ram = runner_resource_req["ramMin"]
821             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
822                 self.collection_cache_size = runner_resource_req["keep_cache"]
823
824         if submit_runner_ram:
825             # Command line / initializer overrides default and/or spec from workflow
826             self.submit_runner_ram = submit_runner_ram
827
828         if self.submit_runner_ram <= 0:
829             raise Exception("Value of submit-runner-ram must be greater than zero")
830
831         if self.submit_runner_cores <= 0:
832             raise Exception("Value of submit-runner-cores must be greater than zero")
833
834         self.merged_map = merged_map or {}
835
836     def job(self,
837             job_order,         # type: Mapping[Text, Text]
838             output_callbacks,  # type: Callable[[Any, Any], Any]
839             runtimeContext     # type: RuntimeContext
840            ):  # type: (...) -> Generator[Any, None, None]
841         self.job_order = job_order
842         self._init_job(job_order, runtimeContext)
843         yield self
844
845     def update_pipeline_component(self, record):
846         pass
847
848     def done(self, record):
849         """Base method for handling a completed runner."""
850
851         try:
852             if record["state"] == "Complete":
853                 if record.get("exit_code") is not None:
854                     if record["exit_code"] == 33:
855                         processStatus = "UnsupportedRequirement"
856                     elif record["exit_code"] == 0:
857                         processStatus = "success"
858                     else:
859                         processStatus = "permanentFail"
860                 else:
861                     processStatus = "success"
862             else:
863                 processStatus = "permanentFail"
864
865             outputs = {}
866
867             if processStatus == "permanentFail":
868                 logc = arvados.collection.CollectionReader(record["log"],
869                                                            api_client=self.arvrunner.api,
870                                                            keep_client=self.arvrunner.keep_client,
871                                                            num_retries=self.arvrunner.num_retries)
872                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
873
874             self.final_output = record["output"]
875             outc = arvados.collection.CollectionReader(self.final_output,
876                                                        api_client=self.arvrunner.api,
877                                                        keep_client=self.arvrunner.keep_client,
878                                                        num_retries=self.arvrunner.num_retries)
879             if "cwl.output.json" in outc:
880                 with outc.open("cwl.output.json", "rb") as f:
881                     if f.size() > 0:
882                         outputs = json.loads(f.read().decode())
883             def keepify(fileobj):
884                 path = fileobj["location"]
885                 if not path.startswith("keep:"):
886                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
887             adjustFileObjs(outputs, keepify)
888             adjustDirObjs(outputs, keepify)
889         except Exception:
890             logger.exception("[%s] While getting final output object", self.name)
891             self.arvrunner.output_callback({}, "permanentFail")
892         else:
893             self.arvrunner.output_callback(outputs, processStatus)