19109: Avoid recursion fail mode
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109                  container_engine="docker"
110                 )
111
112 def search_schemadef(name, reqs):
113     for r in reqs:
114         if r["class"] == "SchemaDefRequirement":
115             for sd in r["types"]:
116                 if sd["name"] == name:
117                     return sd
118     return None
119
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121                                  "float", "double", "string", "record",
122                                  "array", "enum"))
123
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126         # union type, collect all possible secondaryFiles
127         for i in inputschema:
128             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129         return
130
131     if inputschema == "File":
132         inputschema = {"type": "File"}
133
134     if isinstance(inputschema, basestring):
135         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
136         if sd:
137             inputschema = sd
138         else:
139             return
140
141     if "secondaryFiles" in inputschema:
142         # set secondaryFiles, may be inherited by compound types.
143         secondaryspec = inputschema["secondaryFiles"]
144
145     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
146         not isinstance(inputschema["type"], basestring)):
147         # compound type (union, array, record)
148         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
149
150     elif (inputschema["type"] == "record" and
151           isinstance(primary, Mapping)):
152         #
153         # record type, find secondary files associated with fields.
154         #
155         for f in inputschema["fields"]:
156             p = primary.get(shortname(f["name"]))
157             if p:
158                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
159
160     elif (inputschema["type"] == "array" and
161           isinstance(primary, Sequence)):
162         #
163         # array type, find secondary files of elements
164         #
165         for p in primary:
166             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
167
168     elif (inputschema["type"] == "File" and
169           isinstance(primary, Mapping) and
170           primary.get("class") == "File"):
171
172         if "secondaryFiles" in primary or not secondaryspec:
173             # Nothing to do.
174             return
175
176         #
177         # Found a file, check for secondaryFiles
178         #
179         specs = []
180         primary["secondaryFiles"] = secondaryspec
181         for i, sf in enumerate(aslist(secondaryspec)):
182             if builder.cwlVersion == "v1.0":
183                 pattern = sf
184             else:
185                 pattern = sf["pattern"]
186             if pattern is None:
187                 continue
188             if isinstance(pattern, list):
189                 specs.extend(pattern)
190             elif isinstance(pattern, dict):
191                 specs.append(pattern)
192             elif isinstance(pattern, str):
193                 if builder.cwlVersion == "v1.0":
194                     specs.append({"pattern": pattern, "required": True})
195                 else:
196                     specs.append({"pattern": pattern, "required": sf.get("required")})
197             else:
198                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
199                     "Expression must return list, object, string or null")
200
201         found = []
202         for i, sf in enumerate(specs):
203             if isinstance(sf, dict):
204                 if sf.get("class") == "File":
205                     pattern = None
206                     if sf.get("location") is None:
207                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
208                             "File object is missing 'location': %s" % sf)
209                     sfpath = sf["location"]
210                     required = True
211                 else:
212                     pattern = sf["pattern"]
213                     required = sf.get("required")
214             elif isinstance(sf, str):
215                 pattern = sf
216                 required = True
217             else:
218                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
219                     "Expression must return list, object, string or null")
220
221             if pattern is not None:
222                 if "${" in pattern or "$(" in pattern:
223                     sfname = builder.do_eval(pattern, context=primary)
224                 else:
225                     sfname = substitute(primary["basename"], pattern)
226
227                 if sfname is None:
228                     continue
229
230                 p_location = primary["location"]
231                 if "/" in p_location:
232                     sfpath = (
233                         p_location[0 : p_location.rindex("/") + 1]
234                         + sfname
235                     )
236
237             required = builder.do_eval(required, context=primary)
238
239             if fsaccess.exists(sfpath):
240                 if pattern is not None:
241                     found.append({"location": sfpath, "class": "File"})
242                 else:
243                     found.append(sf)
244             elif required:
245                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246                     "Required secondary file '%s' does not exist" % sfpath)
247
248         primary["secondaryFiles"] = cmap(found)
249         if discovered is not None:
250             discovered[primary["location"]] = primary["secondaryFiles"]
251     elif inputschema["type"] not in primitive_types_set:
252         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
253
254 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
255     for inputschema in inputs:
256         primary = job_order.get(shortname(inputschema["id"]))
257         if isinstance(primary, (Mapping, Sequence)):
258             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
259
260 def upload_dependencies(arvrunner, name, document_loader,
261                         workflowobj, uri, loadref_run,
262                         include_primary=True, discovered_secondaryfiles=None):
263     """Upload the dependencies of the workflowobj document to Keep.
264
265     Returns a pathmapper object mapping local paths to keep references.  Also
266     does an in-place update of references in "workflowobj".
267
268     Use scandeps to find $import, $include, $schemas, run, File and Directory
269     fields that represent external references.
270
271     If workflowobj has an "id" field, this will reload the document to ensure
272     it is scanning the raw document prior to preprocessing.
273     """
274
275     loaded = set()
276     def loadref(b, u):
277         joined = document_loader.fetcher.urljoin(b, u)
278         defrg, _ = urllib.parse.urldefrag(joined)
279         if defrg not in loaded:
280             loaded.add(defrg)
281             # Use fetch_text to get raw file (before preprocessing).
282             text = document_loader.fetch_text(defrg)
283             if isinstance(text, bytes):
284                 textIO = StringIO(text.decode('utf-8'))
285             else:
286                 textIO = StringIO(text)
287             yamlloader = YAML(typ='safe', pure=True)
288             return yamlloader.load(textIO)
289         else:
290             return {}
291
292     if loadref_run:
293         loadref_fields = set(("$import", "run"))
294     else:
295         loadref_fields = set(("$import",))
296
297     scanobj = workflowobj
298     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
299         # Need raw file content (before preprocessing) to ensure
300         # that external references in $include and $mixin are captured.
301         scanobj = loadref("", workflowobj["id"])
302
303     metadata = scanobj
304
305     sc_result = scandeps(uri, scanobj,
306                          loadref_fields,
307                          set(("$include", "location")),
308                          loadref, urljoin=document_loader.fetcher.urljoin,
309                          nestdirs=False)
310
311     optional_deps = scandeps(uri, scanobj,
312                                   loadref_fields,
313                                   set(("$schemas",)),
314                                   loadref, urljoin=document_loader.fetcher.urljoin,
315                                   nestdirs=False)
316
317     sc_result.extend(optional_deps)
318
319     sc = []
320     uuids = {}
321
322     def collect_uuids(obj):
323         loc = obj.get("location", "")
324         sp = loc.split(":")
325         if sp[0] == "keep":
326             # Collect collection uuids that need to be resolved to
327             # portable data hashes
328             gp = collection_uuid_pattern.match(loc)
329             if gp:
330                 uuids[gp.groups()[0]] = obj
331             if collectionUUID in obj:
332                 uuids[obj[collectionUUID]] = obj
333
334     def collect_uploads(obj):
335         loc = obj.get("location", "")
336         sp = loc.split(":")
337         if len(sp) < 1:
338             return
339         if sp[0] in ("file", "http", "https"):
340             # Record local files than need to be uploaded,
341             # don't include file literals, keep references, etc.
342             sc.append(obj)
343         collect_uuids(obj)
344
345     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
346     visit_class(sc_result, ("File", "Directory"), collect_uploads)
347
348     # Resolve any collection uuids we found to portable data hashes
349     # and assign them to uuid_map
350     uuid_map = {}
351     fetch_uuids = list(uuids.keys())
352     while fetch_uuids:
353         # For a large number of fetch_uuids, API server may limit
354         # response size, so keep fetching from API server has nothing
355         # more to give us.
356         lookups = arvrunner.api.collections().list(
357             filters=[["uuid", "in", fetch_uuids]],
358             count="none",
359             select=["uuid", "portable_data_hash"]).execute(
360                 num_retries=arvrunner.num_retries)
361
362         if not lookups["items"]:
363             break
364
365         for l in lookups["items"]:
366             uuid_map[l["uuid"]] = l["portable_data_hash"]
367
368         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
369
370     normalizeFilesDirs(sc)
371
372     if include_primary and "id" in workflowobj:
373         sc.append({"class": "File", "location": workflowobj["id"]})
374
375     def visit_default(obj):
376         def defaults_are_optional(f):
377             if "location" not in f and "path" in f:
378                 f["location"] = f["path"]
379                 del f["path"]
380             normalizeFilesDirs(f)
381             optional_deps.append(f)
382         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
383
384     find_defaults(workflowobj, visit_default)
385
386     discovered = {}
387     def discover_default_secondary_files(obj):
388         builder_job_order = {}
389         for t in obj["inputs"]:
390             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
391         # Need to create a builder object to evaluate expressions.
392         builder = make_builder(builder_job_order,
393                                obj.get("hints", []),
394                                obj.get("requirements", []),
395                                ArvRuntimeContext(),
396                                metadata)
397         discover_secondary_files(arvrunner.fs_access,
398                                  builder,
399                                  obj["inputs"],
400                                  builder_job_order,
401                                  discovered)
402
403     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
404     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
405
406     for d in list(discovered):
407         # Only interested in discovered secondaryFiles which are local
408         # files that need to be uploaded.
409         if d.startswith("file:"):
410             sc.extend(discovered[d])
411         else:
412             del discovered[d]
413
414     mapper = ArvPathMapper(arvrunner, sc, "",
415                            "keep:%s",
416                            "keep:%s/%s",
417                            name=name,
418                            single_collection=True,
419                            optional_deps=optional_deps)
420
421     def setloc(p):
422         loc = p.get("location")
423         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
424             p["location"] = mapper.mapper(p["location"]).resolved
425             return
426
427         if not loc:
428             return
429
430         if collectionUUID in p:
431             uuid = p[collectionUUID]
432             if uuid not in uuid_map:
433                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
434                     "Collection uuid %s not found" % uuid)
435             gp = collection_pdh_pattern.match(loc)
436             if gp and uuid_map[uuid] != gp.groups()[0]:
437                 # This file entry has both collectionUUID and a PDH
438                 # location. If the PDH doesn't match the one returned
439                 # the API server, raise an error.
440                 raise SourceLine(p, "location", validate.ValidationException).makeError(
441                     "Expected collection uuid %s to be %s but API server reported %s" % (
442                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
443
444         gp = collection_uuid_pattern.match(loc)
445         if not gp:
446             return
447         uuid = gp.groups()[0]
448         if uuid not in uuid_map:
449             raise SourceLine(p, "location", validate.ValidationException).makeError(
450                 "Collection uuid %s not found" % uuid)
451         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
452         p[collectionUUID] = uuid
453
454     visit_class(workflowobj, ("File", "Directory"), setloc)
455     visit_class(discovered, ("File", "Directory"), setloc)
456
457     if discovered_secondaryfiles is not None:
458         for d in discovered:
459             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
460
461     if "$schemas" in workflowobj:
462         sch = CommentedSeq()
463         for s in workflowobj["$schemas"]:
464             if s in mapper:
465                 sch.append(mapper.mapper(s).resolved)
466         workflowobj["$schemas"] = sch
467
468     return mapper
469
470
471 def upload_docker(arvrunner, tool):
472     """Uploads Docker images used in CommandLineTool objects."""
473
474     if isinstance(tool, CommandLineTool):
475         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
476         if docker_req:
477             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
478                 # TODO: can be supported by containers API, but not jobs API.
479                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
480                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
481             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
482                                                        arvrunner.runtimeContext.force_docker_pull,
483                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
484                                                        arvrunner.runtimeContext.match_local_docker)
485         else:
486             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
487                                                        True, arvrunner.project_uuid,
488                                                        arvrunner.runtimeContext.force_docker_pull,
489                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
490                                                        arvrunner.runtimeContext.match_local_docker)
491     elif isinstance(tool, cwltool.workflow.Workflow):
492         for s in tool.steps:
493             upload_docker(arvrunner, s.embedded_tool)
494
495
496 def packed_workflow(arvrunner, tool, merged_map):
497     """Create a packed workflow.
498
499     A "packed" workflow is one where all the components have been combined into a single document."""
500
501     rewrites = {}
502     packed = pack(arvrunner.loadingContext, tool.tool["id"],
503                   rewrite_out=rewrites,
504                   loader=tool.doc_loader)
505
506     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
507
508     def visit(v, cur_id):
509         if isinstance(v, dict):
510             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
511                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
512                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
513                 if "id" in v:
514                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
515             if "path" in v and "location" not in v:
516                 v["location"] = v["path"]
517                 del v["path"]
518             if "location" in v and cur_id in merged_map:
519                 if v["location"] in merged_map[cur_id].resolved:
520                     v["location"] = merged_map[cur_id].resolved[v["location"]]
521                 if v["location"] in merged_map[cur_id].secondaryFiles:
522                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
523             if v.get("class") == "DockerRequirement":
524                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
525                                                                                                              arvrunner.project_uuid,
526                                                                                                              arvrunner.runtimeContext.force_docker_pull,
527                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix,
528                                                                                                              arvrunner.runtimeContext.match_local_docker)
529             for l in v:
530                 visit(v[l], cur_id)
531         if isinstance(v, list):
532             for l in v:
533                 visit(l, cur_id)
534     visit(packed, None)
535     return packed
536
537
538 def tag_git_version(packed):
539     if tool.tool["id"].startswith("file://"):
540         path = os.path.dirname(tool.tool["id"][7:])
541         try:
542             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
543         except (OSError, subprocess.CalledProcessError):
544             pass
545         else:
546             packed["http://schema.org/version"] = githash
547
548
549 def upload_job_order(arvrunner, name, tool, job_order):
550     """Upload local files referenced in the input object and return updated input
551     object with 'location' updated to the proper keep references.
552     """
553
554     # Make a copy of the job order and set defaults.
555     builder_job_order = copy.copy(job_order)
556
557     # fill_in_defaults throws an error if there are any
558     # missing required parameters, we don't want it to do that
559     # so make them all optional.
560     inputs_copy = copy.deepcopy(tool.tool["inputs"])
561     for i in inputs_copy:
562         if "null" not in i["type"]:
563             i["type"] = ["null"] + aslist(i["type"])
564
565     fill_in_defaults(inputs_copy,
566                      builder_job_order,
567                      arvrunner.fs_access)
568     # Need to create a builder object to evaluate expressions.
569     builder = make_builder(builder_job_order,
570                            tool.hints,
571                            tool.requirements,
572                            ArvRuntimeContext(),
573                            tool.metadata)
574     # Now update job_order with secondaryFiles
575     discover_secondary_files(arvrunner.fs_access,
576                              builder,
577                              tool.tool["inputs"],
578                              job_order)
579
580     jobmapper = upload_dependencies(arvrunner,
581                                     name,
582                                     tool.doc_loader,
583                                     job_order,
584                                     job_order.get("id", "#"),
585                                     False)
586
587     if "id" in job_order:
588         del job_order["id"]
589
590     # Need to filter this out, gets added by cwltool when providing
591     # parameters on the command line.
592     if "job_order" in job_order:
593         del job_order["job_order"]
594
595     return job_order
596
597 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
598
599 def upload_workflow_deps(arvrunner, tool):
600     # Ensure that Docker images needed by this workflow are available
601
602     upload_docker(arvrunner, tool)
603
604     document_loader = tool.doc_loader
605
606     merged_map = {}
607
608     def upload_tool_deps(deptool):
609         if "id" in deptool:
610             discovered_secondaryfiles = {}
611             pm = upload_dependencies(arvrunner,
612                                      "%s dependencies" % (shortname(deptool["id"])),
613                                      document_loader,
614                                      deptool,
615                                      deptool["id"],
616                                      False,
617                                      include_primary=False,
618                                      discovered_secondaryfiles=discovered_secondaryfiles)
619             document_loader.idx[deptool["id"]] = deptool
620             toolmap = {}
621             for k,v in pm.items():
622                 toolmap[k] = v.resolved
623             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
624
625     tool.visit(upload_tool_deps)
626
627     return merged_map
628
629 def arvados_jobs_image(arvrunner, img):
630     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
631
632     try:
633         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
634                                                           arvrunner.runtimeContext.force_docker_pull,
635                                                           arvrunner.runtimeContext.tmp_outdir_prefix,
636                                                           arvrunner.runtimeContext.match_local_docker)
637     except Exception as e:
638         raise Exception("Docker image %s is not available\n%s" % (img, e) )
639
640
641 def upload_workflow_collection(arvrunner, name, packed):
642     collection = arvados.collection.Collection(api_client=arvrunner.api,
643                                                keep_client=arvrunner.keep_client,
644                                                num_retries=arvrunner.num_retries)
645     with collection.open("workflow.cwl", "w") as f:
646         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
647
648     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
649                ["name", "like", name+"%"]]
650     if arvrunner.project_uuid:
651         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
652     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
653
654     if exists["items"]:
655         logger.info("Using collection %s", exists["items"][0]["uuid"])
656     else:
657         collection.save_new(name=name,
658                             owner_uuid=arvrunner.project_uuid,
659                             ensure_unique_name=True,
660                             num_retries=arvrunner.num_retries)
661         logger.info("Uploaded to %s", collection.manifest_locator())
662
663     return collection.portable_data_hash()
664
665
666 class Runner(Process):
667     """Base class for runner processes, which submit an instance of
668     arvados-cwl-runner and wait for the final result."""
669
670     def __init__(self, runner, updated_tool,
671                  tool, loadingContext, enable_reuse,
672                  output_name, output_tags, submit_runner_ram=0,
673                  name=None, on_error=None, submit_runner_image=None,
674                  intermediate_output_ttl=0, merged_map=None,
675                  priority=None, secret_store=None,
676                  collection_cache_size=256,
677                  collection_cache_is_default=True):
678
679         loadingContext = loadingContext.copy()
680         loadingContext.metadata = updated_tool.metadata.copy()
681
682         super(Runner, self).__init__(updated_tool.tool, loadingContext)
683
684         self.arvrunner = runner
685         self.embedded_tool = tool
686         self.job_order = None
687         self.running = False
688         if enable_reuse:
689             # If reuse is permitted by command line arguments but
690             # disabled by the workflow itself, disable it.
691             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
692             if reuse_req:
693                 enable_reuse = reuse_req["enableReuse"]
694         self.enable_reuse = enable_reuse
695         self.uuid = None
696         self.final_output = None
697         self.output_name = output_name
698         self.output_tags = output_tags
699         self.name = name
700         self.on_error = on_error
701         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
702         self.intermediate_output_ttl = intermediate_output_ttl
703         self.priority = priority
704         self.secret_store = secret_store
705         self.enable_dev = loadingContext.enable_dev
706
707         self.submit_runner_cores = 1
708         self.submit_runner_ram = 1024  # defaut 1 GiB
709         self.collection_cache_size = collection_cache_size
710
711         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
712         if runner_resource_req:
713             if runner_resource_req.get("coresMin"):
714                 self.submit_runner_cores = runner_resource_req["coresMin"]
715             if runner_resource_req.get("ramMin"):
716                 self.submit_runner_ram = runner_resource_req["ramMin"]
717             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
718                 self.collection_cache_size = runner_resource_req["keep_cache"]
719
720         if submit_runner_ram:
721             # Command line / initializer overrides default and/or spec from workflow
722             self.submit_runner_ram = submit_runner_ram
723
724         if self.submit_runner_ram <= 0:
725             raise Exception("Value of submit-runner-ram must be greater than zero")
726
727         if self.submit_runner_cores <= 0:
728             raise Exception("Value of submit-runner-cores must be greater than zero")
729
730         self.merged_map = merged_map or {}
731
732     def job(self,
733             job_order,         # type: Mapping[Text, Text]
734             output_callbacks,  # type: Callable[[Any, Any], Any]
735             runtimeContext     # type: RuntimeContext
736            ):  # type: (...) -> Generator[Any, None, None]
737         self.job_order = job_order
738         self._init_job(job_order, runtimeContext)
739         yield self
740
741     def update_pipeline_component(self, record):
742         pass
743
744     def done(self, record):
745         """Base method for handling a completed runner."""
746
747         try:
748             if record["state"] == "Complete":
749                 if record.get("exit_code") is not None:
750                     if record["exit_code"] == 33:
751                         processStatus = "UnsupportedRequirement"
752                     elif record["exit_code"] == 0:
753                         processStatus = "success"
754                     else:
755                         processStatus = "permanentFail"
756                 else:
757                     processStatus = "success"
758             else:
759                 processStatus = "permanentFail"
760
761             outputs = {}
762
763             if processStatus == "permanentFail":
764                 logc = arvados.collection.CollectionReader(record["log"],
765                                                            api_client=self.arvrunner.api,
766                                                            keep_client=self.arvrunner.keep_client,
767                                                            num_retries=self.arvrunner.num_retries)
768                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
769
770             self.final_output = record["output"]
771             outc = arvados.collection.CollectionReader(self.final_output,
772                                                        api_client=self.arvrunner.api,
773                                                        keep_client=self.arvrunner.keep_client,
774                                                        num_retries=self.arvrunner.num_retries)
775             if "cwl.output.json" in outc:
776                 with outc.open("cwl.output.json", "rb") as f:
777                     if f.size() > 0:
778                         outputs = json.loads(f.read().decode())
779             def keepify(fileobj):
780                 path = fileobj["location"]
781                 if not path.startswith("keep:"):
782                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
783             adjustFileObjs(outputs, keepify)
784             adjustDirObjs(outputs, keepify)
785         except Exception:
786             logger.exception("[%s] While getting final output object", self.name)
787             self.arvrunner.output_callback({}, "permanentFail")
788         else:
789             self.arvrunner.output_callback(outputs, processStatus)