6b670c73df4e6d58872193897223429337b4402f
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 import arvados.util
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52
53 logger = logging.getLogger('arvados.cwl-runner')
54
55 def trim_anonymous_location(obj):
56     """Remove 'location' field from File and Directory literals.
57
58     To make internal handling easier, literals are assigned a random id for
59     'location'.  However, when writing the record back out, this can break
60     reproducibility.  Since it is valid for literals not have a 'location'
61     field, remove it.
62
63     """
64
65     if obj.get("location", "").startswith("_:"):
66         del obj["location"]
67
68
69 def remove_redundant_fields(obj):
70     for field in ("path", "nameext", "nameroot", "dirname"):
71         if field in obj:
72             del obj[field]
73
74
75 def find_defaults(d, op):
76     if isinstance(d, list):
77         for i in d:
78             find_defaults(i, op)
79     elif isinstance(d, dict):
80         if "default" in d:
81             op(d)
82         else:
83             for i in viewvalues(d):
84                 find_defaults(i, op)
85
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
87     return Builder(
88                  job=joborder,
89                  files=[],               # type: List[Dict[Text, Text]]
90                  bindings=[],            # type: List[Dict[Text, Any]]
91                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
92                  names=None,               # type: Names
93                  requirements=requirements,        # type: List[Dict[Text, Any]]
94                  hints=hints,               # type: List[Dict[Text, Any]]
95                  resources={},           # type: Dict[str, int]
96                  mutation_manager=None,    # type: Optional[MutationManager]
97                  formatgraph=None,         # type: Optional[Graph]
98                  make_fs_access=None,      # type: Type[StdFsAccess]
99                  fs_access=None,           # type: StdFsAccess
100                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101                  timeout=runtimeContext.eval_timeout,             # type: float
102                  debug=runtimeContext.debug,               # type: bool
103                  js_console=runtimeContext.js_console,          # type: bool
104                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
105                  loadListing="",         # type: Text
106                  outdir="",              # type: Text
107                  tmpdir="",              # type: Text
108                  stagedir="",            # type: Text
109                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
110                  container_engine="docker"
111                 )
112
113 def search_schemadef(name, reqs):
114     for r in reqs:
115         if r["class"] == "SchemaDefRequirement":
116             for sd in r["types"]:
117                 if sd["name"] == name:
118                     return sd
119     return None
120
121 primitive_types_set = frozenset(("null", "boolean", "int", "long",
122                                  "float", "double", "string", "record",
123                                  "array", "enum"))
124
125 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
126     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
127         # union type, collect all possible secondaryFiles
128         for i in inputschema:
129             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130         return
131
132     if isinstance(inputschema, basestring):
133         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
134         if sd:
135             inputschema = sd
136         else:
137             return
138
139     if "secondaryFiles" in inputschema:
140         # set secondaryFiles, may be inherited by compound types.
141         secondaryspec = inputschema["secondaryFiles"]
142
143     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
144         not isinstance(inputschema["type"], basestring)):
145         # compound type (union, array, record)
146         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147
148     elif (inputschema["type"] == "record" and
149           isinstance(primary, Mapping)):
150         #
151         # record type, find secondary files associated with fields.
152         #
153         for f in inputschema["fields"]:
154             p = primary.get(shortname(f["name"]))
155             if p:
156                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157
158     elif (inputschema["type"] == "array" and
159           isinstance(primary, Sequence)):
160         #
161         # array type, find secondary files of elements
162         #
163         for p in primary:
164             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165
166     elif (inputschema["type"] == "File" and
167           secondaryspec and
168           isinstance(primary, Mapping) and
169           primary.get("class") == "File" and
170           "secondaryFiles" not in primary):
171         #
172         # Found a file, check for secondaryFiles
173         #
174         specs = []
175         primary["secondaryFiles"] = secondaryspec
176         for i, sf in enumerate(aslist(secondaryspec)):
177             if builder.cwlVersion == "v1.0":
178                 pattern = builder.do_eval(sf, context=primary)
179             else:
180                 pattern = builder.do_eval(sf["pattern"], context=primary)
181             if pattern is None:
182                 continue
183             if isinstance(pattern, list):
184                 specs.extend(pattern)
185             elif isinstance(pattern, dict):
186                 specs.append(pattern)
187             elif isinstance(pattern, str):
188                 if builder.cwlVersion == "v1.0":
189                     specs.append({"pattern": pattern, "required": True})
190                 else:
191                     specs.append({"pattern": pattern, "required": sf.get("required")})
192             else:
193                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
194                     "Expression must return list, object, string or null")
195
196         found = []
197         for i, sf in enumerate(specs):
198             if isinstance(sf, dict):
199                 if sf.get("class") == "File":
200                     pattern = None
201                     if sf.get("location") is None:
202                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
203                             "File object is missing 'location': %s" % sf)
204                     sfpath = sf["location"]
205                     required = True
206                 else:
207                     pattern = sf["pattern"]
208                     required = sf.get("required")
209             elif isinstance(sf, str):
210                 pattern = sf
211                 required = True
212             else:
213                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214                     "Expression must return list, object, string or null")
215
216             if pattern is not None:
217                 sfpath = substitute(primary["location"], pattern)
218
219             required = builder.do_eval(required, context=primary)
220
221             if fsaccess.exists(sfpath):
222                 if pattern is not None:
223                     found.append({"location": sfpath, "class": "File"})
224                 else:
225                     found.append(sf)
226             elif required:
227                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
228                     "Required secondary file '%s' does not exist" % sfpath)
229
230         primary["secondaryFiles"] = cmap(found)
231         if discovered is not None:
232             discovered[primary["location"]] = primary["secondaryFiles"]
233     elif inputschema["type"] not in primitive_types_set:
234         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
235
236 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
237     for inputschema in inputs:
238         primary = job_order.get(shortname(inputschema["id"]))
239         if isinstance(primary, (Mapping, Sequence)):
240             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
241
242 def upload_dependencies(arvrunner, name, document_loader,
243                         workflowobj, uri, loadref_run,
244                         include_primary=True, discovered_secondaryfiles=None):
245     """Upload the dependencies of the workflowobj document to Keep.
246
247     Returns a pathmapper object mapping local paths to keep references.  Also
248     does an in-place update of references in "workflowobj".
249
250     Use scandeps to find $import, $include, $schemas, run, File and Directory
251     fields that represent external references.
252
253     If workflowobj has an "id" field, this will reload the document to ensure
254     it is scanning the raw document prior to preprocessing.
255     """
256
257     loaded = set()
258     def loadref(b, u):
259         joined = document_loader.fetcher.urljoin(b, u)
260         defrg, _ = urllib.parse.urldefrag(joined)
261         if defrg not in loaded:
262             loaded.add(defrg)
263             # Use fetch_text to get raw file (before preprocessing).
264             text = document_loader.fetch_text(defrg)
265             if isinstance(text, bytes):
266                 textIO = StringIO(text.decode('utf-8'))
267             else:
268                 textIO = StringIO(text)
269             yamlloader = YAML(typ='safe', pure=True)
270             return yamlloader.load(textIO)
271         else:
272             return {}
273
274     if loadref_run:
275         loadref_fields = set(("$import", "run"))
276     else:
277         loadref_fields = set(("$import",))
278
279     scanobj = workflowobj
280     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
281         # Need raw file content (before preprocessing) to ensure
282         # that external references in $include and $mixin are captured.
283         scanobj = loadref("", workflowobj["id"])
284
285     metadata = scanobj
286
287     sc_result = scandeps(uri, scanobj,
288                          loadref_fields,
289                          set(("$include", "location")),
290                          loadref, urljoin=document_loader.fetcher.urljoin,
291                          nestdirs=False)
292
293     optional_deps = scandeps(uri, scanobj,
294                                   loadref_fields,
295                                   set(("$schemas",)),
296                                   loadref, urljoin=document_loader.fetcher.urljoin,
297                                   nestdirs=False)
298
299     sc_result.extend(optional_deps)
300
301     sc = []
302     uuids = {}
303
304     def collect_uuids(obj):
305         loc = obj.get("location", "")
306         sp = loc.split(":")
307         if sp[0] == "keep":
308             # Collect collection uuids that need to be resolved to
309             # portable data hashes
310             gp = collection_uuid_pattern.match(loc)
311             if gp:
312                 uuids[gp.groups()[0]] = obj
313             if collectionUUID in obj:
314                 uuids[obj[collectionUUID]] = obj
315
316     def collect_uploads(obj):
317         loc = obj.get("location", "")
318         sp = loc.split(":")
319         if len(sp) < 1:
320             return
321         if sp[0] in ("file", "http", "https"):
322             # Record local files than need to be uploaded,
323             # don't include file literals, keep references, etc.
324             sc.append(obj)
325         collect_uuids(obj)
326
327     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
328     visit_class(sc_result, ("File", "Directory"), collect_uploads)
329
330     # Resolve any collection uuids we found to portable data hashes
331     # and assign them to uuid_map
332     uuid_map = {}
333     fetch_uuids = list(uuids.keys())
334     while fetch_uuids:
335         # For a large number of fetch_uuids, API server may limit
336         # response size, so keep fetching from API server has nothing
337         # more to give us.
338         lookups = arvrunner.api.collections().list(
339             filters=[["uuid", "in", fetch_uuids]],
340             count="none",
341             select=["uuid", "portable_data_hash"]).execute(
342                 num_retries=arvrunner.num_retries)
343
344         if not lookups["items"]:
345             break
346
347         for l in lookups["items"]:
348             uuid_map[l["uuid"]] = l["portable_data_hash"]
349
350         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
351
352     normalizeFilesDirs(sc)
353
354     if include_primary and "id" in workflowobj:
355         sc.append({"class": "File", "location": workflowobj["id"]})
356
357     def visit_default(obj):
358         def defaults_are_optional(f):
359             if "location" not in f and "path" in f:
360                 f["location"] = f["path"]
361                 del f["path"]
362             normalizeFilesDirs(f)
363             optional_deps.append(f)
364         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
365
366     find_defaults(workflowobj, visit_default)
367
368     discovered = {}
369     def discover_default_secondary_files(obj):
370         builder_job_order = {}
371         for t in obj["inputs"]:
372             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
373         # Need to create a builder object to evaluate expressions.
374         builder = make_builder(builder_job_order,
375                                obj.get("hints", []),
376                                obj.get("requirements", []),
377                                ArvRuntimeContext(),
378                                metadata)
379         discover_secondary_files(arvrunner.fs_access,
380                                  builder,
381                                  obj["inputs"],
382                                  builder_job_order,
383                                  discovered)
384
385     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
386     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
387
388     for d in list(discovered):
389         # Only interested in discovered secondaryFiles which are local
390         # files that need to be uploaded.
391         if d.startswith("file:"):
392             sc.extend(discovered[d])
393         else:
394             del discovered[d]
395
396     mapper = ArvPathMapper(arvrunner, sc, "",
397                            "keep:%s",
398                            "keep:%s/%s",
399                            name=name,
400                            single_collection=True,
401                            optional_deps=optional_deps)
402
403     keeprefs = set()
404     def addkeepref(k):
405         keeprefs.add(collection_pdh_pattern.match(k).group(1))
406
407     def setloc(p):
408         loc = p.get("location")
409         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
410             p["location"] = mapper.mapper(p["location"]).resolved
411             addkeepref(p["location"])
412             return
413
414         if not loc:
415             return
416
417         if collectionUUID in p:
418             uuid = p[collectionUUID]
419             if uuid not in uuid_map:
420                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
421                     "Collection uuid %s not found" % uuid)
422             gp = collection_pdh_pattern.match(loc)
423             if gp and uuid_map[uuid] != gp.groups()[0]:
424                 # This file entry has both collectionUUID and a PDH
425                 # location. If the PDH doesn't match the one returned
426                 # the API server, raise an error.
427                 raise SourceLine(p, "location", validate.ValidationException).makeError(
428                     "Expected collection uuid %s to be %s but API server reported %s" % (
429                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
430
431         gp = collection_uuid_pattern.match(loc)
432         if not gp:
433             # Not a uuid pattern (must be a pdh pattern)
434             addkeepref(p["location"])
435             return
436
437         uuid = gp.groups()[0]
438         if uuid not in uuid_map:
439             raise SourceLine(p, "location", validate.ValidationException).makeError(
440                 "Collection uuid %s not found" % uuid)
441         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
442         p[collectionUUID] = uuid
443
444     visit_class(workflowobj, ("File", "Directory"), setloc)
445     visit_class(discovered, ("File", "Directory"), setloc)
446
447     if discovered_secondaryfiles is not None:
448         for d in discovered:
449             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
450
451     if arvrunner.runtimeContext.copy_deps:
452         # Find referenced collections and copy them into the
453         # destination project, for easy sharing.
454         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
455                                      filters=[["portable_data_hash", "in", list(keeprefs)],
456                                               ["owner_uuid", "=", arvrunner.project_uuid]],
457                                      select=["uuid", "portable_data_hash", "created_at"]))
458
459         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
460         for kr in keeprefs:
461             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
462                                                   order="created_at desc",
463                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
464                                                    limit=1).execute()
465             if len(col["items"]) == 0:
466                 logger.warning("Cannot find collection with portable data hash %s", kr)
467                 continue
468             col = col["items"][0]
469             try:
470                 arvrunner.api.collections().create(body={"collection": {
471                     "owner_uuid": arvrunner.project_uuid,
472                     "name": col["name"],
473                     "description": col["description"],
474                     "properties": col["properties"],
475                     "portable_data_hash": col["portable_data_hash"],
476                     "manifest_text": col["manifest_text"],
477                     "storage_classes_desired": col["storage_classes_desired"],
478                     "trash_at": col["trash_at"]
479                 }}, ensure_unique_name=True).execute()
480             except Exception as e:
481                 logger.warning("Unable copy collection to destination: %s", e)
482
483     if "$schemas" in workflowobj:
484         sch = CommentedSeq()
485         for s in workflowobj["$schemas"]:
486             if s in mapper:
487                 sch.append(mapper.mapper(s).resolved)
488         workflowobj["$schemas"] = sch
489
490     return mapper
491
492
493 def upload_docker(arvrunner, tool):
494     """Uploads Docker images used in CommandLineTool objects."""
495
496     if isinstance(tool, CommandLineTool):
497         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
498         if docker_req:
499             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
500                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
501                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
502
503             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
504                                                        arvrunner.runtimeContext.force_docker_pull,
505                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
506                                                        arvrunner.runtimeContext.match_local_docker,
507                                                        arvrunner.runtimeContext.copy_deps)
508         else:
509             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
510                                                        True, arvrunner.project_uuid,
511                                                        arvrunner.runtimeContext.force_docker_pull,
512                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
513                                                        arvrunner.runtimeContext.match_local_docker,
514                                                        arvrunner.runtimeContext.copy_deps)
515     elif isinstance(tool, cwltool.workflow.Workflow):
516         for s in tool.steps:
517             upload_docker(arvrunner, s.embedded_tool)
518
519
520 def packed_workflow(arvrunner, tool, merged_map):
521     """Create a packed workflow.
522
523     A "packed" workflow is one where all the components have been combined into a single document."""
524
525     rewrites = {}
526     packed = pack(arvrunner.loadingContext, tool.tool["id"],
527                   rewrite_out=rewrites,
528                   loader=tool.doc_loader)
529
530     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
531
532     def visit(v, cur_id):
533         if isinstance(v, dict):
534             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
535                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
536                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
537                 if "id" in v:
538                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
539             if "path" in v and "location" not in v:
540                 v["location"] = v["path"]
541                 del v["path"]
542             if "location" in v and cur_id in merged_map:
543                 if v["location"] in merged_map[cur_id].resolved:
544                     v["location"] = merged_map[cur_id].resolved[v["location"]]
545                 if v["location"] in merged_map[cur_id].secondaryFiles:
546                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
547             if v.get("class") == "DockerRequirement":
548                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
549                                                                                                              arvrunner.project_uuid,
550                                                                                                              arvrunner.runtimeContext.force_docker_pull,
551                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix,
552                                                                                                              arvrunner.runtimeContext.match_local_docker,
553                                                                                                              arvrunner.runtimeContext.copy_deps)
554             for l in v:
555                 visit(v[l], cur_id)
556         if isinstance(v, list):
557             for l in v:
558                 visit(l, cur_id)
559     visit(packed, None)
560     return packed
561
562
563 def tag_git_version(packed):
564     if tool.tool["id"].startswith("file://"):
565         path = os.path.dirname(tool.tool["id"][7:])
566         try:
567             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
568         except (OSError, subprocess.CalledProcessError):
569             pass
570         else:
571             packed["http://schema.org/version"] = githash
572
573
574 def upload_job_order(arvrunner, name, tool, job_order):
575     """Upload local files referenced in the input object and return updated input
576     object with 'location' updated to the proper keep references.
577     """
578
579     # Make a copy of the job order and set defaults.
580     builder_job_order = copy.copy(job_order)
581
582     # fill_in_defaults throws an error if there are any
583     # missing required parameters, we don't want it to do that
584     # so make them all optional.
585     inputs_copy = copy.deepcopy(tool.tool["inputs"])
586     for i in inputs_copy:
587         if "null" not in i["type"]:
588             i["type"] = ["null"] + aslist(i["type"])
589
590     fill_in_defaults(inputs_copy,
591                      builder_job_order,
592                      arvrunner.fs_access)
593     # Need to create a builder object to evaluate expressions.
594     builder = make_builder(builder_job_order,
595                            tool.hints,
596                            tool.requirements,
597                            ArvRuntimeContext(),
598                            tool.metadata)
599     # Now update job_order with secondaryFiles
600     discover_secondary_files(arvrunner.fs_access,
601                              builder,
602                              tool.tool["inputs"],
603                              job_order)
604
605     jobmapper = upload_dependencies(arvrunner,
606                                     name,
607                                     tool.doc_loader,
608                                     job_order,
609                                     job_order.get("id", "#"),
610                                     False)
611
612     if "id" in job_order:
613         del job_order["id"]
614
615     # Need to filter this out, gets added by cwltool when providing
616     # parameters on the command line.
617     if "job_order" in job_order:
618         del job_order["job_order"]
619
620     return job_order
621
622 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
623
624 def upload_workflow_deps(arvrunner, tool):
625     # Ensure that Docker images needed by this workflow are available
626
627     upload_docker(arvrunner, tool)
628
629     document_loader = tool.doc_loader
630
631     merged_map = {}
632
633     def upload_tool_deps(deptool):
634         if "id" in deptool:
635             discovered_secondaryfiles = {}
636             pm = upload_dependencies(arvrunner,
637                                      "%s dependencies" % (shortname(deptool["id"])),
638                                      document_loader,
639                                      deptool,
640                                      deptool["id"],
641                                      False,
642                                      include_primary=False,
643                                      discovered_secondaryfiles=discovered_secondaryfiles)
644             document_loader.idx[deptool["id"]] = deptool
645             toolmap = {}
646             for k,v in pm.items():
647                 toolmap[k] = v.resolved
648             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
649
650     tool.visit(upload_tool_deps)
651
652     return merged_map
653
654 def arvados_jobs_image(arvrunner, img):
655     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
656
657     try:
658         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
659                                                           arvrunner.runtimeContext.force_docker_pull,
660                                                           arvrunner.runtimeContext.tmp_outdir_prefix,
661                                                           arvrunner.runtimeContext.match_local_docker,
662                                                           arvrunner.runtimeContext.copy_deps)
663     except Exception as e:
664         raise Exception("Docker image %s is not available\n%s" % (img, e) )
665
666
667 def upload_workflow_collection(arvrunner, name, packed):
668     collection = arvados.collection.Collection(api_client=arvrunner.api,
669                                                keep_client=arvrunner.keep_client,
670                                                num_retries=arvrunner.num_retries)
671     with collection.open("workflow.cwl", "w") as f:
672         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
673
674     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
675                ["name", "like", name+"%"]]
676     if arvrunner.project_uuid:
677         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
678     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
679
680     if exists["items"]:
681         logger.info("Using collection %s", exists["items"][0]["uuid"])
682     else:
683         collection.save_new(name=name,
684                             owner_uuid=arvrunner.project_uuid,
685                             ensure_unique_name=True,
686                             num_retries=arvrunner.num_retries)
687         logger.info("Uploaded to %s", collection.manifest_locator())
688
689     return collection.portable_data_hash()
690
691
692 class Runner(Process):
693     """Base class for runner processes, which submit an instance of
694     arvados-cwl-runner and wait for the final result."""
695
696     def __init__(self, runner, updated_tool,
697                  tool, loadingContext, enable_reuse,
698                  output_name, output_tags, submit_runner_ram=0,
699                  name=None, on_error=None, submit_runner_image=None,
700                  intermediate_output_ttl=0, merged_map=None,
701                  priority=None, secret_store=None,
702                  collection_cache_size=256,
703                  collection_cache_is_default=True):
704
705         loadingContext = loadingContext.copy()
706         loadingContext.metadata = updated_tool.metadata.copy()
707
708         super(Runner, self).__init__(updated_tool.tool, loadingContext)
709
710         self.arvrunner = runner
711         self.embedded_tool = tool
712         self.job_order = None
713         self.running = False
714         if enable_reuse:
715             # If reuse is permitted by command line arguments but
716             # disabled by the workflow itself, disable it.
717             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
718             if reuse_req:
719                 enable_reuse = reuse_req["enableReuse"]
720         self.enable_reuse = enable_reuse
721         self.uuid = None
722         self.final_output = None
723         self.output_name = output_name
724         self.output_tags = output_tags
725         self.name = name
726         self.on_error = on_error
727         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
728         self.intermediate_output_ttl = intermediate_output_ttl
729         self.priority = priority
730         self.secret_store = secret_store
731         self.enable_dev = loadingContext.enable_dev
732
733         self.submit_runner_cores = 1
734         self.submit_runner_ram = 1024  # defaut 1 GiB
735         self.collection_cache_size = collection_cache_size
736
737         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
738         if runner_resource_req:
739             if runner_resource_req.get("coresMin"):
740                 self.submit_runner_cores = runner_resource_req["coresMin"]
741             if runner_resource_req.get("ramMin"):
742                 self.submit_runner_ram = runner_resource_req["ramMin"]
743             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
744                 self.collection_cache_size = runner_resource_req["keep_cache"]
745
746         if submit_runner_ram:
747             # Command line / initializer overrides default and/or spec from workflow
748             self.submit_runner_ram = submit_runner_ram
749
750         if self.submit_runner_ram <= 0:
751             raise Exception("Value of submit-runner-ram must be greater than zero")
752
753         if self.submit_runner_cores <= 0:
754             raise Exception("Value of submit-runner-cores must be greater than zero")
755
756         self.merged_map = merged_map or {}
757
758     def job(self,
759             job_order,         # type: Mapping[Text, Text]
760             output_callbacks,  # type: Callable[[Any, Any], Any]
761             runtimeContext     # type: RuntimeContext
762            ):  # type: (...) -> Generator[Any, None, None]
763         self.job_order = job_order
764         self._init_job(job_order, runtimeContext)
765         yield self
766
767     def update_pipeline_component(self, record):
768         pass
769
770     def done(self, record):
771         """Base method for handling a completed runner."""
772
773         try:
774             if record["state"] == "Complete":
775                 if record.get("exit_code") is not None:
776                     if record["exit_code"] == 33:
777                         processStatus = "UnsupportedRequirement"
778                     elif record["exit_code"] == 0:
779                         processStatus = "success"
780                     else:
781                         processStatus = "permanentFail"
782                 else:
783                     processStatus = "success"
784             else:
785                 processStatus = "permanentFail"
786
787             outputs = {}
788
789             if processStatus == "permanentFail":
790                 logc = arvados.collection.CollectionReader(record["log"],
791                                                            api_client=self.arvrunner.api,
792                                                            keep_client=self.arvrunner.keep_client,
793                                                            num_retries=self.arvrunner.num_retries)
794                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
795
796             self.final_output = record["output"]
797             outc = arvados.collection.CollectionReader(self.final_output,
798                                                        api_client=self.arvrunner.api,
799                                                        keep_client=self.arvrunner.keep_client,
800                                                        num_retries=self.arvrunner.num_retries)
801             if "cwl.output.json" in outc:
802                 with outc.open("cwl.output.json", "rb") as f:
803                     if f.size() > 0:
804                         outputs = json.loads(f.read().decode())
805             def keepify(fileobj):
806                 path = fileobj["location"]
807                 if not path.startswith("keep:"):
808                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
809             adjustFileObjs(outputs, keepify)
810             adjustDirObjs(outputs, keepify)
811         except Exception:
812             logger.exception("[%s] While getting final output object", self.name)
813             self.arvrunner.output_callback({}, "permanentFail")
814         else:
815             self.arvrunner.output_callback(outputs, processStatus)