19070: Fix most tests
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 import arvados.util
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52
53 logger = logging.getLogger('arvados.cwl-runner')
54
55 def trim_anonymous_location(obj):
56     """Remove 'location' field from File and Directory literals.
57
58     To make internal handling easier, literals are assigned a random id for
59     'location'.  However, when writing the record back out, this can break
60     reproducibility.  Since it is valid for literals not have a 'location'
61     field, remove it.
62
63     """
64
65     if obj.get("location", "").startswith("_:"):
66         del obj["location"]
67
68
69 def remove_redundant_fields(obj):
70     for field in ("path", "nameext", "nameroot", "dirname"):
71         if field in obj:
72             del obj[field]
73
74
75 def find_defaults(d, op):
76     if isinstance(d, list):
77         for i in d:
78             find_defaults(i, op)
79     elif isinstance(d, dict):
80         if "default" in d:
81             op(d)
82         else:
83             for i in viewvalues(d):
84                 find_defaults(i, op)
85
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
87     return Builder(
88                  job=joborder,
89                  files=[],               # type: List[Dict[Text, Text]]
90                  bindings=[],            # type: List[Dict[Text, Any]]
91                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
92                  names=None,               # type: Names
93                  requirements=requirements,        # type: List[Dict[Text, Any]]
94                  hints=hints,               # type: List[Dict[Text, Any]]
95                  resources={},           # type: Dict[str, int]
96                  mutation_manager=None,    # type: Optional[MutationManager]
97                  formatgraph=None,         # type: Optional[Graph]
98                  make_fs_access=None,      # type: Type[StdFsAccess]
99                  fs_access=None,           # type: StdFsAccess
100                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101                  timeout=runtimeContext.eval_timeout,             # type: float
102                  debug=runtimeContext.debug,               # type: bool
103                  js_console=runtimeContext.js_console,          # type: bool
104                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
105                  loadListing="",         # type: Text
106                  outdir="",              # type: Text
107                  tmpdir="",              # type: Text
108                  stagedir="",            # type: Text
109                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
110                  container_engine="docker"
111                 )
112
113 def search_schemadef(name, reqs):
114     for r in reqs:
115         if r["class"] == "SchemaDefRequirement":
116             for sd in r["types"]:
117                 if sd["name"] == name:
118                     return sd
119     return None
120
121 primitive_types_set = frozenset(("null", "boolean", "int", "long",
122                                  "float", "double", "string", "record",
123                                  "array", "enum"))
124
125 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
126     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
127         # union type, collect all possible secondaryFiles
128         for i in inputschema:
129             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130         return
131
132     if isinstance(inputschema, basestring):
133         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
134         if sd:
135             inputschema = sd
136         else:
137             return
138
139     if "secondaryFiles" in inputschema:
140         # set secondaryFiles, may be inherited by compound types.
141         secondaryspec = inputschema["secondaryFiles"]
142
143     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
144         not isinstance(inputschema["type"], basestring)):
145         # compound type (union, array, record)
146         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147
148     elif (inputschema["type"] == "record" and
149           isinstance(primary, Mapping)):
150         #
151         # record type, find secondary files associated with fields.
152         #
153         for f in inputschema["fields"]:
154             p = primary.get(shortname(f["name"]))
155             if p:
156                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157
158     elif (inputschema["type"] == "array" and
159           isinstance(primary, Sequence)):
160         #
161         # array type, find secondary files of elements
162         #
163         for p in primary:
164             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165
166     elif (inputschema["type"] == "File" and
167           secondaryspec and
168           isinstance(primary, Mapping) and
169           primary.get("class") == "File" and
170           "secondaryFiles" not in primary):
171         #
172         # Found a file, check for secondaryFiles
173         #
174         specs = []
175         primary["secondaryFiles"] = secondaryspec
176         for i, sf in enumerate(aslist(secondaryspec)):
177             if builder.cwlVersion == "v1.0":
178                 pattern = builder.do_eval(sf, context=primary)
179             else:
180                 pattern = builder.do_eval(sf["pattern"], context=primary)
181             if pattern is None:
182                 continue
183             if isinstance(pattern, list):
184                 specs.extend(pattern)
185             elif isinstance(pattern, dict):
186                 specs.append(pattern)
187             elif isinstance(pattern, str):
188                 if builder.cwlVersion == "v1.0":
189                     specs.append({"pattern": pattern, "required": True})
190                 else:
191                     specs.append({"pattern": pattern, "required": sf.get("required")})
192             else:
193                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
194                     "Expression must return list, object, string or null")
195
196         found = []
197         for i, sf in enumerate(specs):
198             if isinstance(sf, dict):
199                 if sf.get("class") == "File":
200                     pattern = None
201                     if sf.get("location") is None:
202                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
203                             "File object is missing 'location': %s" % sf)
204                     sfpath = sf["location"]
205                     required = True
206                 else:
207                     pattern = sf["pattern"]
208                     required = sf.get("required")
209             elif isinstance(sf, str):
210                 pattern = sf
211                 required = True
212             else:
213                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214                     "Expression must return list, object, string or null")
215
216             if pattern is not None:
217                 sfpath = substitute(primary["location"], pattern)
218
219             required = builder.do_eval(required, context=primary)
220
221             if fsaccess.exists(sfpath):
222                 if pattern is not None:
223                     found.append({"location": sfpath, "class": "File"})
224                 else:
225                     found.append(sf)
226             elif required:
227                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
228                     "Required secondary file '%s' does not exist" % sfpath)
229
230         primary["secondaryFiles"] = cmap(found)
231         if discovered is not None:
232             discovered[primary["location"]] = primary["secondaryFiles"]
233     elif inputschema["type"] not in primitive_types_set:
234         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
235
236 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
237     for inputschema in inputs:
238         primary = job_order.get(shortname(inputschema["id"]))
239         if isinstance(primary, (Mapping, Sequence)):
240             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
241
242 def upload_dependencies(arvrunner, name, document_loader,
243                         workflowobj, uri, loadref_run,
244                         include_primary=True, discovered_secondaryfiles=None):
245     """Upload the dependencies of the workflowobj document to Keep.
246
247     Returns a pathmapper object mapping local paths to keep references.  Also
248     does an in-place update of references in "workflowobj".
249
250     Use scandeps to find $import, $include, $schemas, run, File and Directory
251     fields that represent external references.
252
253     If workflowobj has an "id" field, this will reload the document to ensure
254     it is scanning the raw document prior to preprocessing.
255     """
256
257     loaded = set()
258     def loadref(b, u):
259         joined = document_loader.fetcher.urljoin(b, u)
260         defrg, _ = urllib.parse.urldefrag(joined)
261         if defrg not in loaded:
262             loaded.add(defrg)
263             # Use fetch_text to get raw file (before preprocessing).
264             text = document_loader.fetch_text(defrg)
265             if isinstance(text, bytes):
266                 textIO = StringIO(text.decode('utf-8'))
267             else:
268                 textIO = StringIO(text)
269             yamlloader = YAML(typ='safe', pure=True)
270             return yamlloader.load(textIO)
271         else:
272             return {}
273
274     if loadref_run:
275         loadref_fields = set(("$import", "run"))
276     else:
277         loadref_fields = set(("$import",))
278
279     scanobj = workflowobj
280     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
281         # Need raw file content (before preprocessing) to ensure
282         # that external references in $include and $mixin are captured.
283         scanobj = loadref("", workflowobj["id"])
284
285     metadata = scanobj
286
287     sc_result = scandeps(uri, scanobj,
288                          loadref_fields,
289                          set(("$include", "location")),
290                          loadref, urljoin=document_loader.fetcher.urljoin,
291                          nestdirs=False)
292
293     optional_deps = scandeps(uri, scanobj,
294                                   loadref_fields,
295                                   set(("$schemas",)),
296                                   loadref, urljoin=document_loader.fetcher.urljoin,
297                                   nestdirs=False)
298
299     sc_result.extend(optional_deps)
300
301     sc = []
302     uuids = {}
303
304     def collect_uuids(obj):
305         loc = obj.get("location", "")
306         sp = loc.split(":")
307         if sp[0] == "keep":
308             # Collect collection uuids that need to be resolved to
309             # portable data hashes
310             gp = collection_uuid_pattern.match(loc)
311             if gp:
312                 uuids[gp.groups()[0]] = obj
313             if collectionUUID in obj:
314                 uuids[obj[collectionUUID]] = obj
315
316     def collect_uploads(obj):
317         loc = obj.get("location", "")
318         sp = loc.split(":")
319         if len(sp) < 1:
320             return
321         if sp[0] in ("file", "http", "https"):
322             # Record local files than need to be uploaded,
323             # don't include file literals, keep references, etc.
324             sc.append(obj)
325         collect_uuids(obj)
326
327     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
328     visit_class(sc_result, ("File", "Directory"), collect_uploads)
329
330     # Resolve any collection uuids we found to portable data hashes
331     # and assign them to uuid_map
332     uuid_map = {}
333     fetch_uuids = list(uuids.keys())
334     while fetch_uuids:
335         # For a large number of fetch_uuids, API server may limit
336         # response size, so keep fetching from API server has nothing
337         # more to give us.
338         lookups = arvrunner.api.collections().list(
339             filters=[["uuid", "in", fetch_uuids]],
340             count="none",
341             select=["uuid", "portable_data_hash"]).execute(
342                 num_retries=arvrunner.num_retries)
343
344         if not lookups["items"]:
345             break
346
347         for l in lookups["items"]:
348             uuid_map[l["uuid"]] = l["portable_data_hash"]
349
350         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
351
352     normalizeFilesDirs(sc)
353
354     if include_primary and "id" in workflowobj:
355         sc.append({"class": "File", "location": workflowobj["id"]})
356
357     def visit_default(obj):
358         def defaults_are_optional(f):
359             if "location" not in f and "path" in f:
360                 f["location"] = f["path"]
361                 del f["path"]
362             normalizeFilesDirs(f)
363             optional_deps.append(f)
364         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
365
366     find_defaults(workflowobj, visit_default)
367
368     discovered = {}
369     def discover_default_secondary_files(obj):
370         builder_job_order = {}
371         for t in obj["inputs"]:
372             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
373         # Need to create a builder object to evaluate expressions.
374         builder = make_builder(builder_job_order,
375                                obj.get("hints", []),
376                                obj.get("requirements", []),
377                                ArvRuntimeContext(),
378                                metadata)
379         discover_secondary_files(arvrunner.fs_access,
380                                  builder,
381                                  obj["inputs"],
382                                  builder_job_order,
383                                  discovered)
384
385     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
386     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
387
388     for d in list(discovered):
389         # Only interested in discovered secondaryFiles which are local
390         # files that need to be uploaded.
391         if d.startswith("file:"):
392             sc.extend(discovered[d])
393         else:
394             del discovered[d]
395
396     mapper = ArvPathMapper(arvrunner, sc, "",
397                            "keep:%s",
398                            "keep:%s/%s",
399                            name=name,
400                            single_collection=True,
401                            optional_deps=optional_deps)
402
403     keeprefs = set()
404     def addkeepref(k):
405         if k.startswith("keep:"):
406             keeprefs.add(collection_pdh_pattern.match(k).group(1))
407
408     def setloc(p):
409         loc = p.get("location")
410         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
411             p["location"] = mapper.mapper(p["location"]).resolved
412             addkeepref(p["location"])
413             return
414
415         if not loc:
416             return
417
418         if collectionUUID in p:
419             uuid = p[collectionUUID]
420             if uuid not in uuid_map:
421                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
422                     "Collection uuid %s not found" % uuid)
423             gp = collection_pdh_pattern.match(loc)
424             if gp and uuid_map[uuid] != gp.groups()[0]:
425                 # This file entry has both collectionUUID and a PDH
426                 # location. If the PDH doesn't match the one returned
427                 # the API server, raise an error.
428                 raise SourceLine(p, "location", validate.ValidationException).makeError(
429                     "Expected collection uuid %s to be %s but API server reported %s" % (
430                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
431
432         gp = collection_uuid_pattern.match(loc)
433         if not gp:
434             # Not a uuid pattern (must be a pdh pattern)
435             addkeepref(p["location"])
436             return
437
438         uuid = gp.groups()[0]
439         if uuid not in uuid_map:
440             raise SourceLine(p, "location", validate.ValidationException).makeError(
441                 "Collection uuid %s not found" % uuid)
442         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
443         p[collectionUUID] = uuid
444
445     visit_class(workflowobj, ("File", "Directory"), setloc)
446     visit_class(discovered, ("File", "Directory"), setloc)
447
448     if discovered_secondaryfiles is not None:
449         for d in discovered:
450             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
451
452     if arvrunner.runtimeContext.copy_deps:
453         # Find referenced collections and copy them into the
454         # destination project, for easy sharing.
455         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
456                                      filters=[["portable_data_hash", "in", list(keeprefs)],
457                                               ["owner_uuid", "=", arvrunner.project_uuid]],
458                                      select=["uuid", "portable_data_hash", "created_at"]))
459
460         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
461         for kr in keeprefs:
462             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
463                                                   order="created_at desc",
464                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
465                                                    limit=1).execute()
466             if len(col["items"]) == 0:
467                 logger.warning("Cannot find collection with portable data hash %s", kr)
468                 continue
469             col = col["items"][0]
470             try:
471                 arvrunner.api.collections().create(body={"collection": {
472                     "owner_uuid": arvrunner.project_uuid,
473                     "name": col["name"],
474                     "description": col["description"],
475                     "properties": col["properties"],
476                     "portable_data_hash": col["portable_data_hash"],
477                     "manifest_text": col["manifest_text"],
478                     "storage_classes_desired": col["storage_classes_desired"],
479                     "trash_at": col["trash_at"]
480                 }}, ensure_unique_name=True).execute()
481             except Exception as e:
482                 logger.warning("Unable copy collection to destination: %s", e)
483
484     if "$schemas" in workflowobj:
485         sch = CommentedSeq()
486         for s in workflowobj["$schemas"]:
487             if s in mapper:
488                 sch.append(mapper.mapper(s).resolved)
489         workflowobj["$schemas"] = sch
490
491     return mapper
492
493
494 def upload_docker(arvrunner, tool):
495     """Uploads Docker images used in CommandLineTool objects."""
496
497     if isinstance(tool, CommandLineTool):
498         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
499         if docker_req:
500             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
501                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
502                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
503
504             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
505                                                        arvrunner.runtimeContext.force_docker_pull,
506                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
507                                                        arvrunner.runtimeContext.match_local_docker,
508                                                        arvrunner.runtimeContext.copy_deps)
509         else:
510             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
511                                                        True, arvrunner.project_uuid,
512                                                        arvrunner.runtimeContext.force_docker_pull,
513                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
514                                                        arvrunner.runtimeContext.match_local_docker,
515                                                        arvrunner.runtimeContext.copy_deps)
516     elif isinstance(tool, cwltool.workflow.Workflow):
517         for s in tool.steps:
518             upload_docker(arvrunner, s.embedded_tool)
519
520
521 def packed_workflow(arvrunner, tool, merged_map):
522     """Create a packed workflow.
523
524     A "packed" workflow is one where all the components have been combined into a single document."""
525
526     rewrites = {}
527     packed = pack(arvrunner.loadingContext, tool.tool["id"],
528                   rewrite_out=rewrites,
529                   loader=tool.doc_loader)
530
531     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
532
533     def visit(v, cur_id):
534         if isinstance(v, dict):
535             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
536                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
537                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
538                 if "id" in v:
539                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
540             if "path" in v and "location" not in v:
541                 v["location"] = v["path"]
542                 del v["path"]
543             if "location" in v and cur_id in merged_map:
544                 if v["location"] in merged_map[cur_id].resolved:
545                     v["location"] = merged_map[cur_id].resolved[v["location"]]
546                 if v["location"] in merged_map[cur_id].secondaryFiles:
547                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
548             if v.get("class") == "DockerRequirement":
549                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
550                                                                                                              arvrunner.project_uuid,
551                                                                                                              arvrunner.runtimeContext.force_docker_pull,
552                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix,
553                                                                                                              arvrunner.runtimeContext.match_local_docker,
554                                                                                                              arvrunner.runtimeContext.copy_deps)
555             for l in v:
556                 visit(v[l], cur_id)
557         if isinstance(v, list):
558             for l in v:
559                 visit(l, cur_id)
560     visit(packed, None)
561     return packed
562
563
564 def tag_git_version(packed):
565     if tool.tool["id"].startswith("file://"):
566         path = os.path.dirname(tool.tool["id"][7:])
567         try:
568             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
569         except (OSError, subprocess.CalledProcessError):
570             pass
571         else:
572             packed["http://schema.org/version"] = githash
573
574
575 def upload_job_order(arvrunner, name, tool, job_order):
576     """Upload local files referenced in the input object and return updated input
577     object with 'location' updated to the proper keep references.
578     """
579
580     # Make a copy of the job order and set defaults.
581     builder_job_order = copy.copy(job_order)
582
583     # fill_in_defaults throws an error if there are any
584     # missing required parameters, we don't want it to do that
585     # so make them all optional.
586     inputs_copy = copy.deepcopy(tool.tool["inputs"])
587     for i in inputs_copy:
588         if "null" not in i["type"]:
589             i["type"] = ["null"] + aslist(i["type"])
590
591     fill_in_defaults(inputs_copy,
592                      builder_job_order,
593                      arvrunner.fs_access)
594     # Need to create a builder object to evaluate expressions.
595     builder = make_builder(builder_job_order,
596                            tool.hints,
597                            tool.requirements,
598                            ArvRuntimeContext(),
599                            tool.metadata)
600     # Now update job_order with secondaryFiles
601     discover_secondary_files(arvrunner.fs_access,
602                              builder,
603                              tool.tool["inputs"],
604                              job_order)
605
606     jobmapper = upload_dependencies(arvrunner,
607                                     name,
608                                     tool.doc_loader,
609                                     job_order,
610                                     job_order.get("id", "#"),
611                                     False)
612
613     if "id" in job_order:
614         del job_order["id"]
615
616     # Need to filter this out, gets added by cwltool when providing
617     # parameters on the command line.
618     if "job_order" in job_order:
619         del job_order["job_order"]
620
621     return job_order
622
623 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
624
625 def upload_workflow_deps(arvrunner, tool):
626     # Ensure that Docker images needed by this workflow are available
627
628     upload_docker(arvrunner, tool)
629
630     document_loader = tool.doc_loader
631
632     merged_map = {}
633
634     def upload_tool_deps(deptool):
635         if "id" in deptool:
636             discovered_secondaryfiles = {}
637             pm = upload_dependencies(arvrunner,
638                                      "%s dependencies" % (shortname(deptool["id"])),
639                                      document_loader,
640                                      deptool,
641                                      deptool["id"],
642                                      False,
643                                      include_primary=False,
644                                      discovered_secondaryfiles=discovered_secondaryfiles)
645             document_loader.idx[deptool["id"]] = deptool
646             toolmap = {}
647             for k,v in pm.items():
648                 toolmap[k] = v.resolved
649             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
650
651     tool.visit(upload_tool_deps)
652
653     return merged_map
654
655 def arvados_jobs_image(arvrunner, img):
656     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
657
658     try:
659         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
660                                                           arvrunner.runtimeContext.force_docker_pull,
661                                                           arvrunner.runtimeContext.tmp_outdir_prefix,
662                                                           arvrunner.runtimeContext.match_local_docker,
663                                                           arvrunner.runtimeContext.copy_deps)
664     except Exception as e:
665         raise Exception("Docker image %s is not available\n%s" % (img, e) )
666
667
668 def upload_workflow_collection(arvrunner, name, packed):
669     collection = arvados.collection.Collection(api_client=arvrunner.api,
670                                                keep_client=arvrunner.keep_client,
671                                                num_retries=arvrunner.num_retries)
672     with collection.open("workflow.cwl", "w") as f:
673         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
674
675     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
676                ["name", "like", name+"%"]]
677     if arvrunner.project_uuid:
678         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
679     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
680
681     if exists["items"]:
682         logger.info("Using collection %s", exists["items"][0]["uuid"])
683     else:
684         collection.save_new(name=name,
685                             owner_uuid=arvrunner.project_uuid,
686                             ensure_unique_name=True,
687                             num_retries=arvrunner.num_retries)
688         logger.info("Uploaded to %s", collection.manifest_locator())
689
690     return collection.portable_data_hash()
691
692
693 class Runner(Process):
694     """Base class for runner processes, which submit an instance of
695     arvados-cwl-runner and wait for the final result."""
696
697     def __init__(self, runner, updated_tool,
698                  tool, loadingContext, enable_reuse,
699                  output_name, output_tags, submit_runner_ram=0,
700                  name=None, on_error=None, submit_runner_image=None,
701                  intermediate_output_ttl=0, merged_map=None,
702                  priority=None, secret_store=None,
703                  collection_cache_size=256,
704                  collection_cache_is_default=True):
705
706         loadingContext = loadingContext.copy()
707         loadingContext.metadata = updated_tool.metadata.copy()
708
709         super(Runner, self).__init__(updated_tool.tool, loadingContext)
710
711         self.arvrunner = runner
712         self.embedded_tool = tool
713         self.job_order = None
714         self.running = False
715         if enable_reuse:
716             # If reuse is permitted by command line arguments but
717             # disabled by the workflow itself, disable it.
718             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
719             if reuse_req:
720                 enable_reuse = reuse_req["enableReuse"]
721         self.enable_reuse = enable_reuse
722         self.uuid = None
723         self.final_output = None
724         self.output_name = output_name
725         self.output_tags = output_tags
726         self.name = name
727         self.on_error = on_error
728         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
729         self.intermediate_output_ttl = intermediate_output_ttl
730         self.priority = priority
731         self.secret_store = secret_store
732         self.enable_dev = loadingContext.enable_dev
733
734         self.submit_runner_cores = 1
735         self.submit_runner_ram = 1024  # defaut 1 GiB
736         self.collection_cache_size = collection_cache_size
737
738         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
739         if runner_resource_req:
740             if runner_resource_req.get("coresMin"):
741                 self.submit_runner_cores = runner_resource_req["coresMin"]
742             if runner_resource_req.get("ramMin"):
743                 self.submit_runner_ram = runner_resource_req["ramMin"]
744             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
745                 self.collection_cache_size = runner_resource_req["keep_cache"]
746
747         if submit_runner_ram:
748             # Command line / initializer overrides default and/or spec from workflow
749             self.submit_runner_ram = submit_runner_ram
750
751         if self.submit_runner_ram <= 0:
752             raise Exception("Value of submit-runner-ram must be greater than zero")
753
754         if self.submit_runner_cores <= 0:
755             raise Exception("Value of submit-runner-cores must be greater than zero")
756
757         self.merged_map = merged_map or {}
758
759     def job(self,
760             job_order,         # type: Mapping[Text, Text]
761             output_callbacks,  # type: Callable[[Any, Any], Any]
762             runtimeContext     # type: RuntimeContext
763            ):  # type: (...) -> Generator[Any, None, None]
764         self.job_order = job_order
765         self._init_job(job_order, runtimeContext)
766         yield self
767
768     def update_pipeline_component(self, record):
769         pass
770
771     def done(self, record):
772         """Base method for handling a completed runner."""
773
774         try:
775             if record["state"] == "Complete":
776                 if record.get("exit_code") is not None:
777                     if record["exit_code"] == 33:
778                         processStatus = "UnsupportedRequirement"
779                     elif record["exit_code"] == 0:
780                         processStatus = "success"
781                     else:
782                         processStatus = "permanentFail"
783                 else:
784                     processStatus = "success"
785             else:
786                 processStatus = "permanentFail"
787
788             outputs = {}
789
790             if processStatus == "permanentFail":
791                 logc = arvados.collection.CollectionReader(record["log"],
792                                                            api_client=self.arvrunner.api,
793                                                            keep_client=self.arvrunner.keep_client,
794                                                            num_retries=self.arvrunner.num_retries)
795                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
796
797             self.final_output = record["output"]
798             outc = arvados.collection.CollectionReader(self.final_output,
799                                                        api_client=self.arvrunner.api,
800                                                        keep_client=self.arvrunner.keep_client,
801                                                        num_retries=self.arvrunner.num_retries)
802             if "cwl.output.json" in outc:
803                 with outc.open("cwl.output.json", "rb") as f:
804                     if f.size() > 0:
805                         outputs = json.loads(f.read().decode())
806             def keepify(fileobj):
807                 path = fileobj["location"]
808                 if not path.startswith("keep:"):
809                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
810             adjustFileObjs(outputs, keepify)
811             adjustDirObjs(outputs, keepify)
812         except Exception:
813             logger.exception("[%s] While getting final output object", self.name)
814             self.arvrunner.output_callback({}, "permanentFail")
815         else:
816             self.arvrunner.output_callback(outputs, processStatus)