19070: Fix --update-workflow
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 import arvados.util
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52
53 logger = logging.getLogger('arvados.cwl-runner')
54
55 def trim_anonymous_location(obj):
56     """Remove 'location' field from File and Directory literals.
57
58     To make internal handling easier, literals are assigned a random id for
59     'location'.  However, when writing the record back out, this can break
60     reproducibility.  Since it is valid for literals not have a 'location'
61     field, remove it.
62
63     """
64
65     if obj.get("location", "").startswith("_:"):
66         del obj["location"]
67
68
69 def remove_redundant_fields(obj):
70     for field in ("path", "nameext", "nameroot", "dirname"):
71         if field in obj:
72             del obj[field]
73
74
75 def find_defaults(d, op):
76     if isinstance(d, list):
77         for i in d:
78             find_defaults(i, op)
79     elif isinstance(d, dict):
80         if "default" in d:
81             op(d)
82         else:
83             for i in viewvalues(d):
84                 find_defaults(i, op)
85
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
87     return Builder(
88                  job=joborder,
89                  files=[],               # type: List[Dict[Text, Text]]
90                  bindings=[],            # type: List[Dict[Text, Any]]
91                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
92                  names=None,               # type: Names
93                  requirements=requirements,        # type: List[Dict[Text, Any]]
94                  hints=hints,               # type: List[Dict[Text, Any]]
95                  resources={},           # type: Dict[str, int]
96                  mutation_manager=None,    # type: Optional[MutationManager]
97                  formatgraph=None,         # type: Optional[Graph]
98                  make_fs_access=None,      # type: Type[StdFsAccess]
99                  fs_access=None,           # type: StdFsAccess
100                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101                  timeout=runtimeContext.eval_timeout,             # type: float
102                  debug=runtimeContext.debug,               # type: bool
103                  js_console=runtimeContext.js_console,          # type: bool
104                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
105                  loadListing="",         # type: Text
106                  outdir="",              # type: Text
107                  tmpdir="",              # type: Text
108                  stagedir="",            # type: Text
109                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
110                  container_engine="docker"
111                 )
112
113 def search_schemadef(name, reqs):
114     for r in reqs:
115         if r["class"] == "SchemaDefRequirement":
116             for sd in r["types"]:
117                 if sd["name"] == name:
118                     return sd
119     return None
120
121 primitive_types_set = frozenset(("null", "boolean", "int", "long",
122                                  "float", "double", "string", "record",
123                                  "array", "enum"))
124
125 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
126     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
127         # union type, collect all possible secondaryFiles
128         for i in inputschema:
129             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130         return
131
132     if isinstance(inputschema, basestring):
133         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
134         if sd:
135             inputschema = sd
136         else:
137             return
138
139     if "secondaryFiles" in inputschema:
140         # set secondaryFiles, may be inherited by compound types.
141         secondaryspec = inputschema["secondaryFiles"]
142
143     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
144         not isinstance(inputschema["type"], basestring)):
145         # compound type (union, array, record)
146         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147
148     elif (inputschema["type"] == "record" and
149           isinstance(primary, Mapping)):
150         #
151         # record type, find secondary files associated with fields.
152         #
153         for f in inputschema["fields"]:
154             p = primary.get(shortname(f["name"]))
155             if p:
156                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157
158     elif (inputschema["type"] == "array" and
159           isinstance(primary, Sequence)):
160         #
161         # array type, find secondary files of elements
162         #
163         for p in primary:
164             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165
166     elif (inputschema["type"] == "File" and
167           secondaryspec and
168           isinstance(primary, Mapping) and
169           primary.get("class") == "File" and
170           "secondaryFiles" not in primary):
171         #
172         # Found a file, check for secondaryFiles
173         #
174         specs = []
175         primary["secondaryFiles"] = secondaryspec
176         for i, sf in enumerate(aslist(secondaryspec)):
177             if builder.cwlVersion == "v1.0":
178                 pattern = builder.do_eval(sf, context=primary)
179             else:
180                 pattern = builder.do_eval(sf["pattern"], context=primary)
181             if pattern is None:
182                 continue
183             if isinstance(pattern, list):
184                 specs.extend(pattern)
185             elif isinstance(pattern, dict):
186                 specs.append(pattern)
187             elif isinstance(pattern, str):
188                 if builder.cwlVersion == "v1.0":
189                     specs.append({"pattern": pattern, "required": True})
190                 else:
191                     specs.append({"pattern": pattern, "required": sf.get("required")})
192             else:
193                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
194                     "Expression must return list, object, string or null")
195
196         found = []
197         for i, sf in enumerate(specs):
198             if isinstance(sf, dict):
199                 if sf.get("class") == "File":
200                     pattern = None
201                     if sf.get("location") is None:
202                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
203                             "File object is missing 'location': %s" % sf)
204                     sfpath = sf["location"]
205                     required = True
206                 else:
207                     pattern = sf["pattern"]
208                     required = sf.get("required")
209             elif isinstance(sf, str):
210                 pattern = sf
211                 required = True
212             else:
213                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214                     "Expression must return list, object, string or null")
215
216             if pattern is not None:
217                 sfpath = substitute(primary["location"], pattern)
218
219             required = builder.do_eval(required, context=primary)
220
221             if fsaccess.exists(sfpath):
222                 if pattern is not None:
223                     found.append({"location": sfpath, "class": "File"})
224                 else:
225                     found.append(sf)
226             elif required:
227                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
228                     "Required secondary file '%s' does not exist" % sfpath)
229
230         primary["secondaryFiles"] = cmap(found)
231         if discovered is not None:
232             discovered[primary["location"]] = primary["secondaryFiles"]
233     elif inputschema["type"] not in primitive_types_set:
234         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
235
236 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
237     for inputschema in inputs:
238         primary = job_order.get(shortname(inputschema["id"]))
239         if isinstance(primary, (Mapping, Sequence)):
240             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
241
242 def upload_dependencies(arvrunner, name, document_loader,
243                         workflowobj, uri, loadref_run, runtimeContext,
244                         include_primary=True, discovered_secondaryfiles=None):
245     """Upload the dependencies of the workflowobj document to Keep.
246
247     Returns a pathmapper object mapping local paths to keep references.  Also
248     does an in-place update of references in "workflowobj".
249
250     Use scandeps to find $import, $include, $schemas, run, File and Directory
251     fields that represent external references.
252
253     If workflowobj has an "id" field, this will reload the document to ensure
254     it is scanning the raw document prior to preprocessing.
255     """
256
257     loaded = set()
258     def loadref(b, u):
259         joined = document_loader.fetcher.urljoin(b, u)
260         defrg, _ = urllib.parse.urldefrag(joined)
261         if defrg not in loaded:
262             loaded.add(defrg)
263             # Use fetch_text to get raw file (before preprocessing).
264             text = document_loader.fetch_text(defrg)
265             if isinstance(text, bytes):
266                 textIO = StringIO(text.decode('utf-8'))
267             else:
268                 textIO = StringIO(text)
269             yamlloader = YAML(typ='safe', pure=True)
270             return yamlloader.load(textIO)
271         else:
272             return {}
273
274     if loadref_run:
275         loadref_fields = set(("$import", "run"))
276     else:
277         loadref_fields = set(("$import",))
278
279     scanobj = workflowobj
280     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
281         # Need raw file content (before preprocessing) to ensure
282         # that external references in $include and $mixin are captured.
283         scanobj = loadref("", workflowobj["id"])
284
285     metadata = scanobj
286
287     sc_result = scandeps(uri, scanobj,
288                          loadref_fields,
289                          set(("$include", "location")),
290                          loadref, urljoin=document_loader.fetcher.urljoin,
291                          nestdirs=False)
292
293     optional_deps = scandeps(uri, scanobj,
294                                   loadref_fields,
295                                   set(("$schemas",)),
296                                   loadref, urljoin=document_loader.fetcher.urljoin,
297                                   nestdirs=False)
298
299     sc_result.extend(optional_deps)
300
301     sc = []
302     uuids = {}
303
304     def collect_uuids(obj):
305         loc = obj.get("location", "")
306         sp = loc.split(":")
307         if sp[0] == "keep":
308             # Collect collection uuids that need to be resolved to
309             # portable data hashes
310             gp = collection_uuid_pattern.match(loc)
311             if gp:
312                 uuids[gp.groups()[0]] = obj
313             if collectionUUID in obj:
314                 uuids[obj[collectionUUID]] = obj
315
316     def collect_uploads(obj):
317         loc = obj.get("location", "")
318         sp = loc.split(":")
319         if len(sp) < 1:
320             return
321         if sp[0] in ("file", "http", "https"):
322             # Record local files than need to be uploaded,
323             # don't include file literals, keep references, etc.
324             sc.append(obj)
325         collect_uuids(obj)
326
327     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
328     visit_class(sc_result, ("File", "Directory"), collect_uploads)
329
330     # Resolve any collection uuids we found to portable data hashes
331     # and assign them to uuid_map
332     uuid_map = {}
333     fetch_uuids = list(uuids.keys())
334     while fetch_uuids:
335         # For a large number of fetch_uuids, API server may limit
336         # response size, so keep fetching from API server has nothing
337         # more to give us.
338         lookups = arvrunner.api.collections().list(
339             filters=[["uuid", "in", fetch_uuids]],
340             count="none",
341             select=["uuid", "portable_data_hash"]).execute(
342                 num_retries=arvrunner.num_retries)
343
344         if not lookups["items"]:
345             break
346
347         for l in lookups["items"]:
348             uuid_map[l["uuid"]] = l["portable_data_hash"]
349
350         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
351
352     normalizeFilesDirs(sc)
353
354     if include_primary and "id" in workflowobj:
355         sc.append({"class": "File", "location": workflowobj["id"]})
356
357     def visit_default(obj):
358         def defaults_are_optional(f):
359             if "location" not in f and "path" in f:
360                 f["location"] = f["path"]
361                 del f["path"]
362             normalizeFilesDirs(f)
363             optional_deps.append(f)
364         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
365
366     find_defaults(workflowobj, visit_default)
367
368     discovered = {}
369     def discover_default_secondary_files(obj):
370         builder_job_order = {}
371         for t in obj["inputs"]:
372             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
373         # Need to create a builder object to evaluate expressions.
374         builder = make_builder(builder_job_order,
375                                obj.get("hints", []),
376                                obj.get("requirements", []),
377                                ArvRuntimeContext(),
378                                metadata)
379         discover_secondary_files(arvrunner.fs_access,
380                                  builder,
381                                  obj["inputs"],
382                                  builder_job_order,
383                                  discovered)
384
385     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
386     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
387
388     for d in list(discovered):
389         # Only interested in discovered secondaryFiles which are local
390         # files that need to be uploaded.
391         if d.startswith("file:"):
392             sc.extend(discovered[d])
393         else:
394             del discovered[d]
395
396     mapper = ArvPathMapper(arvrunner, sc, "",
397                            "keep:%s",
398                            "keep:%s/%s",
399                            name=name,
400                            single_collection=True,
401                            optional_deps=optional_deps)
402
403     keeprefs = set()
404     def addkeepref(k):
405         if k.startswith("keep:"):
406             keeprefs.add(collection_pdh_pattern.match(k).group(1))
407
408     def setloc(p):
409         loc = p.get("location")
410         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
411             p["location"] = mapper.mapper(p["location"]).resolved
412             addkeepref(p["location"])
413             return
414
415         if not loc:
416             return
417
418         if collectionUUID in p:
419             uuid = p[collectionUUID]
420             if uuid not in uuid_map:
421                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
422                     "Collection uuid %s not found" % uuid)
423             gp = collection_pdh_pattern.match(loc)
424             if gp and uuid_map[uuid] != gp.groups()[0]:
425                 # This file entry has both collectionUUID and a PDH
426                 # location. If the PDH doesn't match the one returned
427                 # the API server, raise an error.
428                 raise SourceLine(p, "location", validate.ValidationException).makeError(
429                     "Expected collection uuid %s to be %s but API server reported %s" % (
430                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
431
432         gp = collection_uuid_pattern.match(loc)
433         if not gp:
434             # Not a uuid pattern (must be a pdh pattern)
435             addkeepref(p["location"])
436             return
437
438         uuid = gp.groups()[0]
439         if uuid not in uuid_map:
440             raise SourceLine(p, "location", validate.ValidationException).makeError(
441                 "Collection uuid %s not found" % uuid)
442         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
443         p[collectionUUID] = uuid
444
445     visit_class(workflowobj, ("File", "Directory"), setloc)
446     visit_class(discovered, ("File", "Directory"), setloc)
447
448     if discovered_secondaryfiles is not None:
449         for d in discovered:
450             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
451
452     if runtimeContext.copy_deps:
453         # Find referenced collections and copy them into the
454         # destination project, for easy sharing.
455         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
456                                      filters=[["portable_data_hash", "in", list(keeprefs)],
457                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
458                                      select=["uuid", "portable_data_hash", "created_at"]))
459
460         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
461         for kr in keeprefs:
462             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
463                                                   order="created_at desc",
464                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
465                                                    limit=1).execute()
466             if len(col["items"]) == 0:
467                 logger.warning("Cannot find collection with portable data hash %s", kr)
468                 continue
469             col = col["items"][0]
470             try:
471                 arvrunner.api.collections().create(body={"collection": {
472                     "owner_uuid": runtimeContext.project_uuid,
473                     "name": col["name"],
474                     "description": col["description"],
475                     "properties": col["properties"],
476                     "portable_data_hash": col["portable_data_hash"],
477                     "manifest_text": col["manifest_text"],
478                     "storage_classes_desired": col["storage_classes_desired"],
479                     "trash_at": col["trash_at"]
480                 }}, ensure_unique_name=True).execute()
481             except Exception as e:
482                 logger.warning("Unable copy collection to destination: %s", e)
483
484     if "$schemas" in workflowobj:
485         sch = CommentedSeq()
486         for s in workflowobj["$schemas"]:
487             if s in mapper:
488                 sch.append(mapper.mapper(s).resolved)
489         workflowobj["$schemas"] = sch
490
491     return mapper
492
493
494 def upload_docker(arvrunner, tool, runtimeContext):
495     """Uploads Docker images used in CommandLineTool objects."""
496
497     if isinstance(tool, CommandLineTool):
498         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
499         if docker_req:
500             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
501                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
502                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
503
504             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
505                                                        runtimeContext.project_uuid,
506                                                        runtimeContext.force_docker_pull,
507                                                        runtimeContext.tmp_outdir_prefix,
508                                                        runtimeContext.match_local_docker,
509                                                        runtimeContext.copy_deps)
510         else:
511             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
512                                                        True,
513                                                        runtimeContext.project_uuid,
514                                                        runtimeContext.force_docker_pull,
515                                                        runtimeContext.tmp_outdir_prefix,
516                                                        runtimeContext.match_local_docker,
517                                                        runtimeContext.copy_deps)
518     elif isinstance(tool, cwltool.workflow.Workflow):
519         for s in tool.steps:
520             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
521
522
523 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
524     """Create a packed workflow.
525
526     A "packed" workflow is one where all the components have been combined into a single document."""
527
528     rewrites = {}
529     packed = pack(arvrunner.loadingContext, tool.tool["id"],
530                   rewrite_out=rewrites,
531                   loader=tool.doc_loader)
532
533     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
534
535     def visit(v, cur_id):
536         if isinstance(v, dict):
537             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
538                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
539                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
540                 if "id" in v:
541                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
542             if "path" in v and "location" not in v:
543                 v["location"] = v["path"]
544                 del v["path"]
545             if "location" in v and cur_id in merged_map:
546                 if v["location"] in merged_map[cur_id].resolved:
547                     v["location"] = merged_map[cur_id].resolved[v["location"]]
548                 if v["location"] in merged_map[cur_id].secondaryFiles:
549                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
550             if v.get("class") == "DockerRequirement":
551                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
552                                                                                                              runtimeContext.project_uuid,
553                                                                                                              runtimeContext.force_docker_pull,
554                                                                                                              runtimeContext.tmp_outdir_prefix,
555                                                                                                              runtimeContext.match_local_docker,
556                                                                                                              runtimeContext.copy_deps)
557             for l in v:
558                 visit(v[l], cur_id)
559         if isinstance(v, list):
560             for l in v:
561                 visit(l, cur_id)
562     visit(packed, None)
563     return packed
564
565
566 def tag_git_version(packed):
567     if tool.tool["id"].startswith("file://"):
568         path = os.path.dirname(tool.tool["id"][7:])
569         try:
570             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
571         except (OSError, subprocess.CalledProcessError):
572             pass
573         else:
574             packed["http://schema.org/version"] = githash
575
576
577 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
578     """Upload local files referenced in the input object and return updated input
579     object with 'location' updated to the proper keep references.
580     """
581
582     # Make a copy of the job order and set defaults.
583     builder_job_order = copy.copy(job_order)
584
585     # fill_in_defaults throws an error if there are any
586     # missing required parameters, we don't want it to do that
587     # so make them all optional.
588     inputs_copy = copy.deepcopy(tool.tool["inputs"])
589     for i in inputs_copy:
590         if "null" not in i["type"]:
591             i["type"] = ["null"] + aslist(i["type"])
592
593     fill_in_defaults(inputs_copy,
594                      builder_job_order,
595                      arvrunner.fs_access)
596     # Need to create a builder object to evaluate expressions.
597     builder = make_builder(builder_job_order,
598                            tool.hints,
599                            tool.requirements,
600                            ArvRuntimeContext(),
601                            tool.metadata)
602     # Now update job_order with secondaryFiles
603     discover_secondary_files(arvrunner.fs_access,
604                              builder,
605                              tool.tool["inputs"],
606                              job_order)
607
608     jobmapper = upload_dependencies(arvrunner,
609                                     name,
610                                     tool.doc_loader,
611                                     job_order,
612                                     job_order.get("id", "#"),
613                                     False,
614                                     runtimeContext)
615
616     if "id" in job_order:
617         del job_order["id"]
618
619     # Need to filter this out, gets added by cwltool when providing
620     # parameters on the command line.
621     if "job_order" in job_order:
622         del job_order["job_order"]
623
624     return job_order
625
626 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
627
628 def upload_workflow_deps(arvrunner, tool, runtimeContext):
629     # Ensure that Docker images needed by this workflow are available
630
631     upload_docker(arvrunner, tool, runtimeContext)
632
633     document_loader = tool.doc_loader
634
635     merged_map = {}
636
637     def upload_tool_deps(deptool):
638         if "id" in deptool:
639             discovered_secondaryfiles = {}
640             pm = upload_dependencies(arvrunner,
641                                      "%s dependencies" % (shortname(deptool["id"])),
642                                      document_loader,
643                                      deptool,
644                                      deptool["id"],
645                                      False,
646                                      runtimeContext,
647                                      include_primary=False,
648                                      discovered_secondaryfiles=discovered_secondaryfiles)
649             document_loader.idx[deptool["id"]] = deptool
650             toolmap = {}
651             for k,v in pm.items():
652                 toolmap[k] = v.resolved
653             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
654
655     tool.visit(upload_tool_deps)
656
657     return merged_map
658
659 def arvados_jobs_image(arvrunner, img, runtimeContext):
660     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
661
662     try:
663         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
664                                                           True,
665                                                           runtimeContext.project_uuid,
666                                                           runtimeContext.force_docker_pull,
667                                                           runtimeContext.tmp_outdir_prefix,
668                                                           runtimeContext.match_local_docker,
669                                                           runtimeContext.copy_deps)
670     except Exception as e:
671         raise Exception("Docker image %s is not available\n%s" % (img, e) )
672
673
674 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
675     collection = arvados.collection.Collection(api_client=arvrunner.api,
676                                                keep_client=arvrunner.keep_client,
677                                                num_retries=arvrunner.num_retries)
678     with collection.open("workflow.cwl", "w") as f:
679         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
680
681     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
682                ["name", "like", name+"%"]]
683     if runtimeContext.project_uuid:
684         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
685     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
686
687     if exists["items"]:
688         logger.info("Using collection %s", exists["items"][0]["uuid"])
689     else:
690         collection.save_new(name=name,
691                             owner_uuid=runtimeContext.project_uuid,
692                             ensure_unique_name=True,
693                             num_retries=arvrunner.num_retries)
694         logger.info("Uploaded to %s", collection.manifest_locator())
695
696     return collection.portable_data_hash()
697
698
699 class Runner(Process):
700     """Base class for runner processes, which submit an instance of
701     arvados-cwl-runner and wait for the final result."""
702
703     def __init__(self, runner, updated_tool,
704                  tool, loadingContext, enable_reuse,
705                  output_name, output_tags, submit_runner_ram=0,
706                  name=None, on_error=None, submit_runner_image=None,
707                  intermediate_output_ttl=0, merged_map=None,
708                  priority=None, secret_store=None,
709                  collection_cache_size=256,
710                  collection_cache_is_default=True):
711
712         loadingContext = loadingContext.copy()
713         loadingContext.metadata = updated_tool.metadata.copy()
714
715         super(Runner, self).__init__(updated_tool.tool, loadingContext)
716
717         self.arvrunner = runner
718         self.embedded_tool = tool
719         self.job_order = None
720         self.running = False
721         if enable_reuse:
722             # If reuse is permitted by command line arguments but
723             # disabled by the workflow itself, disable it.
724             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
725             if reuse_req:
726                 enable_reuse = reuse_req["enableReuse"]
727         self.enable_reuse = enable_reuse
728         self.uuid = None
729         self.final_output = None
730         self.output_name = output_name
731         self.output_tags = output_tags
732         self.name = name
733         self.on_error = on_error
734         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
735         self.intermediate_output_ttl = intermediate_output_ttl
736         self.priority = priority
737         self.secret_store = secret_store
738         self.enable_dev = loadingContext.enable_dev
739
740         self.submit_runner_cores = 1
741         self.submit_runner_ram = 1024  # defaut 1 GiB
742         self.collection_cache_size = collection_cache_size
743
744         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
745         if runner_resource_req:
746             if runner_resource_req.get("coresMin"):
747                 self.submit_runner_cores = runner_resource_req["coresMin"]
748             if runner_resource_req.get("ramMin"):
749                 self.submit_runner_ram = runner_resource_req["ramMin"]
750             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
751                 self.collection_cache_size = runner_resource_req["keep_cache"]
752
753         if submit_runner_ram:
754             # Command line / initializer overrides default and/or spec from workflow
755             self.submit_runner_ram = submit_runner_ram
756
757         if self.submit_runner_ram <= 0:
758             raise Exception("Value of submit-runner-ram must be greater than zero")
759
760         if self.submit_runner_cores <= 0:
761             raise Exception("Value of submit-runner-cores must be greater than zero")
762
763         self.merged_map = merged_map or {}
764
765     def job(self,
766             job_order,         # type: Mapping[Text, Text]
767             output_callbacks,  # type: Callable[[Any, Any], Any]
768             runtimeContext     # type: RuntimeContext
769            ):  # type: (...) -> Generator[Any, None, None]
770         self.job_order = job_order
771         self._init_job(job_order, runtimeContext)
772         yield self
773
774     def update_pipeline_component(self, record):
775         pass
776
777     def done(self, record):
778         """Base method for handling a completed runner."""
779
780         try:
781             if record["state"] == "Complete":
782                 if record.get("exit_code") is not None:
783                     if record["exit_code"] == 33:
784                         processStatus = "UnsupportedRequirement"
785                     elif record["exit_code"] == 0:
786                         processStatus = "success"
787                     else:
788                         processStatus = "permanentFail"
789                 else:
790                     processStatus = "success"
791             else:
792                 processStatus = "permanentFail"
793
794             outputs = {}
795
796             if processStatus == "permanentFail":
797                 logc = arvados.collection.CollectionReader(record["log"],
798                                                            api_client=self.arvrunner.api,
799                                                            keep_client=self.arvrunner.keep_client,
800                                                            num_retries=self.arvrunner.num_retries)
801                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
802
803             self.final_output = record["output"]
804             outc = arvados.collection.CollectionReader(self.final_output,
805                                                        api_client=self.arvrunner.api,
806                                                        keep_client=self.arvrunner.keep_client,
807                                                        num_retries=self.arvrunner.num_retries)
808             if "cwl.output.json" in outc:
809                 with outc.open("cwl.output.json", "rb") as f:
810                     if f.size() > 0:
811                         outputs = json.loads(f.read().decode())
812             def keepify(fileobj):
813                 path = fileobj["location"]
814                 if not path.startswith("keep:"):
815                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
816             adjustFileObjs(outputs, keepify)
817             adjustDirObjs(outputs, keepify)
818         except Exception:
819             logger.exception("[%s] While getting final output object", self.name)
820             self.arvrunner.output_callback({}, "permanentFail")
821         else:
822             self.arvrunner.output_callback(outputs, processStatus)