19109: test/fix for related bug, finding secondary files with expressions
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109                  container_engine="docker"
110                 )
111
112 def search_schemadef(name, reqs):
113     for r in reqs:
114         if r["class"] == "SchemaDefRequirement":
115             for sd in r["types"]:
116                 if sd["name"] == name:
117                     return sd
118     return None
119
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121                                  "float", "double", "string", "record",
122                                  "array", "enum"))
123
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126         # union type, collect all possible secondaryFiles
127         for i in inputschema:
128             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129         return
130
131     if inputschema == "File":
132         inputschema = {"type": "File"}
133
134     if isinstance(inputschema, basestring):
135         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
136         if sd:
137             inputschema = sd
138         else:
139             return
140
141     if "secondaryFiles" in inputschema:
142         # set secondaryFiles, may be inherited by compound types.
143         secondaryspec = inputschema["secondaryFiles"]
144
145     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
146         not isinstance(inputschema["type"], basestring)):
147         # compound type (union, array, record)
148         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
149
150     elif (inputschema["type"] == "record" and
151           isinstance(primary, Mapping)):
152         #
153         # record type, find secondary files associated with fields.
154         #
155         for f in inputschema["fields"]:
156             p = primary.get(shortname(f["name"]))
157             if p:
158                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
159
160     elif (inputschema["type"] == "array" and
161           isinstance(primary, Sequence)):
162         #
163         # array type, find secondary files of elements
164         #
165         for p in primary:
166             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
167
168     elif (inputschema["type"] == "File" and
169           secondaryspec and
170           isinstance(primary, Mapping) and
171           primary.get("class") == "File" and
172           "secondaryFiles" not in primary):
173         #
174         # Found a file, check for secondaryFiles
175         #
176         specs = []
177         primary["secondaryFiles"] = secondaryspec
178         for i, sf in enumerate(aslist(secondaryspec)):
179             if builder.cwlVersion == "v1.0":
180                 pattern = sf
181             else:
182                 pattern = sf["pattern"]
183             if pattern is None:
184                 continue
185             if isinstance(pattern, list):
186                 specs.extend(pattern)
187             elif isinstance(pattern, dict):
188                 specs.append(pattern)
189             elif isinstance(pattern, str):
190                 if builder.cwlVersion == "v1.0":
191                     specs.append({"pattern": pattern, "required": True})
192                 else:
193                     specs.append({"pattern": pattern, "required": sf.get("required")})
194             else:
195                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
196                     "Expression must return list, object, string or null")
197
198         found = []
199         for i, sf in enumerate(specs):
200             if isinstance(sf, dict):
201                 if sf.get("class") == "File":
202                     pattern = None
203                     if sf.get("location") is None:
204                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
205                             "File object is missing 'location': %s" % sf)
206                     sfpath = sf["location"]
207                     required = True
208                 else:
209                     pattern = sf["pattern"]
210                     required = sf.get("required")
211             elif isinstance(sf, str):
212                 pattern = sf
213                 required = True
214             else:
215                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
216                     "Expression must return list, object, string or null")
217
218             if pattern is not None:
219                 if "${" in pattern or "$(" in pattern:
220                     sfname = builder.do_eval(pattern, context=primary)
221                 else:
222                     sfname = substitute(primary["basename"], pattern)
223
224                 if sfname is None:
225                     continue
226
227                 p_location = primary["location"]
228                 if "/" in p_location:
229                     sfpath = (
230                         p_location[0 : p_location.rindex("/") + 1]
231                         + sfname
232                     )
233
234             required = builder.do_eval(required, context=primary)
235
236             if fsaccess.exists(sfpath):
237                 if pattern is not None:
238                     found.append({"location": sfpath, "class": "File"})
239                 else:
240                     found.append(sf)
241             elif required:
242                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
243                     "Required secondary file '%s' does not exist" % sfpath)
244
245         primary["secondaryFiles"] = cmap(found)
246         if discovered is not None:
247             discovered[primary["location"]] = primary["secondaryFiles"]
248     elif inputschema["type"] not in primitive_types_set:
249         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
250
251 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
252     for inputschema in inputs:
253         primary = job_order.get(shortname(inputschema["id"]))
254         if isinstance(primary, (Mapping, Sequence)):
255             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
256
257 def upload_dependencies(arvrunner, name, document_loader,
258                         workflowobj, uri, loadref_run,
259                         include_primary=True, discovered_secondaryfiles=None):
260     """Upload the dependencies of the workflowobj document to Keep.
261
262     Returns a pathmapper object mapping local paths to keep references.  Also
263     does an in-place update of references in "workflowobj".
264
265     Use scandeps to find $import, $include, $schemas, run, File and Directory
266     fields that represent external references.
267
268     If workflowobj has an "id" field, this will reload the document to ensure
269     it is scanning the raw document prior to preprocessing.
270     """
271
272     loaded = set()
273     def loadref(b, u):
274         joined = document_loader.fetcher.urljoin(b, u)
275         defrg, _ = urllib.parse.urldefrag(joined)
276         if defrg not in loaded:
277             loaded.add(defrg)
278             # Use fetch_text to get raw file (before preprocessing).
279             text = document_loader.fetch_text(defrg)
280             if isinstance(text, bytes):
281                 textIO = StringIO(text.decode('utf-8'))
282             else:
283                 textIO = StringIO(text)
284             yamlloader = YAML(typ='safe', pure=True)
285             return yamlloader.load(textIO)
286         else:
287             return {}
288
289     if loadref_run:
290         loadref_fields = set(("$import", "run"))
291     else:
292         loadref_fields = set(("$import",))
293
294     scanobj = workflowobj
295     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
296         # Need raw file content (before preprocessing) to ensure
297         # that external references in $include and $mixin are captured.
298         scanobj = loadref("", workflowobj["id"])
299
300     metadata = scanobj
301
302     sc_result = scandeps(uri, scanobj,
303                          loadref_fields,
304                          set(("$include", "location")),
305                          loadref, urljoin=document_loader.fetcher.urljoin,
306                          nestdirs=False)
307
308     optional_deps = scandeps(uri, scanobj,
309                                   loadref_fields,
310                                   set(("$schemas",)),
311                                   loadref, urljoin=document_loader.fetcher.urljoin,
312                                   nestdirs=False)
313
314     sc_result.extend(optional_deps)
315
316     sc = []
317     uuids = {}
318
319     def collect_uuids(obj):
320         loc = obj.get("location", "")
321         sp = loc.split(":")
322         if sp[0] == "keep":
323             # Collect collection uuids that need to be resolved to
324             # portable data hashes
325             gp = collection_uuid_pattern.match(loc)
326             if gp:
327                 uuids[gp.groups()[0]] = obj
328             if collectionUUID in obj:
329                 uuids[obj[collectionUUID]] = obj
330
331     def collect_uploads(obj):
332         loc = obj.get("location", "")
333         sp = loc.split(":")
334         if len(sp) < 1:
335             return
336         if sp[0] in ("file", "http", "https"):
337             # Record local files than need to be uploaded,
338             # don't include file literals, keep references, etc.
339             sc.append(obj)
340         collect_uuids(obj)
341
342     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
343     visit_class(sc_result, ("File", "Directory"), collect_uploads)
344
345     # Resolve any collection uuids we found to portable data hashes
346     # and assign them to uuid_map
347     uuid_map = {}
348     fetch_uuids = list(uuids.keys())
349     while fetch_uuids:
350         # For a large number of fetch_uuids, API server may limit
351         # response size, so keep fetching from API server has nothing
352         # more to give us.
353         lookups = arvrunner.api.collections().list(
354             filters=[["uuid", "in", fetch_uuids]],
355             count="none",
356             select=["uuid", "portable_data_hash"]).execute(
357                 num_retries=arvrunner.num_retries)
358
359         if not lookups["items"]:
360             break
361
362         for l in lookups["items"]:
363             uuid_map[l["uuid"]] = l["portable_data_hash"]
364
365         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
366
367     normalizeFilesDirs(sc)
368
369     if include_primary and "id" in workflowobj:
370         sc.append({"class": "File", "location": workflowobj["id"]})
371
372     def visit_default(obj):
373         def defaults_are_optional(f):
374             if "location" not in f and "path" in f:
375                 f["location"] = f["path"]
376                 del f["path"]
377             normalizeFilesDirs(f)
378             optional_deps.append(f)
379         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
380
381     find_defaults(workflowobj, visit_default)
382
383     discovered = {}
384     def discover_default_secondary_files(obj):
385         builder_job_order = {}
386         for t in obj["inputs"]:
387             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
388         # Need to create a builder object to evaluate expressions.
389         builder = make_builder(builder_job_order,
390                                obj.get("hints", []),
391                                obj.get("requirements", []),
392                                ArvRuntimeContext(),
393                                metadata)
394         discover_secondary_files(arvrunner.fs_access,
395                                  builder,
396                                  obj["inputs"],
397                                  builder_job_order,
398                                  discovered)
399
400     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
401     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
402
403     for d in list(discovered):
404         # Only interested in discovered secondaryFiles which are local
405         # files that need to be uploaded.
406         if d.startswith("file:"):
407             sc.extend(discovered[d])
408         else:
409             del discovered[d]
410
411     mapper = ArvPathMapper(arvrunner, sc, "",
412                            "keep:%s",
413                            "keep:%s/%s",
414                            name=name,
415                            single_collection=True,
416                            optional_deps=optional_deps)
417
418     def setloc(p):
419         loc = p.get("location")
420         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
421             p["location"] = mapper.mapper(p["location"]).resolved
422             return
423
424         if not loc:
425             return
426
427         if collectionUUID in p:
428             uuid = p[collectionUUID]
429             if uuid not in uuid_map:
430                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
431                     "Collection uuid %s not found" % uuid)
432             gp = collection_pdh_pattern.match(loc)
433             if gp and uuid_map[uuid] != gp.groups()[0]:
434                 # This file entry has both collectionUUID and a PDH
435                 # location. If the PDH doesn't match the one returned
436                 # the API server, raise an error.
437                 raise SourceLine(p, "location", validate.ValidationException).makeError(
438                     "Expected collection uuid %s to be %s but API server reported %s" % (
439                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
440
441         gp = collection_uuid_pattern.match(loc)
442         if not gp:
443             return
444         uuid = gp.groups()[0]
445         if uuid not in uuid_map:
446             raise SourceLine(p, "location", validate.ValidationException).makeError(
447                 "Collection uuid %s not found" % uuid)
448         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
449         p[collectionUUID] = uuid
450
451     visit_class(workflowobj, ("File", "Directory"), setloc)
452     visit_class(discovered, ("File", "Directory"), setloc)
453
454     if discovered_secondaryfiles is not None:
455         for d in discovered:
456             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
457
458     if "$schemas" in workflowobj:
459         sch = CommentedSeq()
460         for s in workflowobj["$schemas"]:
461             if s in mapper:
462                 sch.append(mapper.mapper(s).resolved)
463         workflowobj["$schemas"] = sch
464
465     return mapper
466
467
468 def upload_docker(arvrunner, tool):
469     """Uploads Docker images used in CommandLineTool objects."""
470
471     if isinstance(tool, CommandLineTool):
472         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
473         if docker_req:
474             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
475                 # TODO: can be supported by containers API, but not jobs API.
476                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
477                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
478             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
479                                                        arvrunner.runtimeContext.force_docker_pull,
480                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
481                                                        arvrunner.runtimeContext.match_local_docker)
482         else:
483             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
484                                                        True, arvrunner.project_uuid,
485                                                        arvrunner.runtimeContext.force_docker_pull,
486                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
487                                                        arvrunner.runtimeContext.match_local_docker)
488     elif isinstance(tool, cwltool.workflow.Workflow):
489         for s in tool.steps:
490             upload_docker(arvrunner, s.embedded_tool)
491
492
493 def packed_workflow(arvrunner, tool, merged_map):
494     """Create a packed workflow.
495
496     A "packed" workflow is one where all the components have been combined into a single document."""
497
498     rewrites = {}
499     packed = pack(arvrunner.loadingContext, tool.tool["id"],
500                   rewrite_out=rewrites,
501                   loader=tool.doc_loader)
502
503     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
504
505     def visit(v, cur_id):
506         if isinstance(v, dict):
507             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
508                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
509                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
510                 if "id" in v:
511                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
512             if "path" in v and "location" not in v:
513                 v["location"] = v["path"]
514                 del v["path"]
515             if "location" in v and cur_id in merged_map:
516                 if v["location"] in merged_map[cur_id].resolved:
517                     v["location"] = merged_map[cur_id].resolved[v["location"]]
518                 if v["location"] in merged_map[cur_id].secondaryFiles:
519                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
520             if v.get("class") == "DockerRequirement":
521                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
522                                                                                                              arvrunner.project_uuid,
523                                                                                                              arvrunner.runtimeContext.force_docker_pull,
524                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix,
525                                                                                                              arvrunner.runtimeContext.match_local_docker)
526             for l in v:
527                 visit(v[l], cur_id)
528         if isinstance(v, list):
529             for l in v:
530                 visit(l, cur_id)
531     visit(packed, None)
532     return packed
533
534
535 def tag_git_version(packed):
536     if tool.tool["id"].startswith("file://"):
537         path = os.path.dirname(tool.tool["id"][7:])
538         try:
539             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
540         except (OSError, subprocess.CalledProcessError):
541             pass
542         else:
543             packed["http://schema.org/version"] = githash
544
545
546 def upload_job_order(arvrunner, name, tool, job_order):
547     """Upload local files referenced in the input object and return updated input
548     object with 'location' updated to the proper keep references.
549     """
550
551     # Make a copy of the job order and set defaults.
552     builder_job_order = copy.copy(job_order)
553
554     # fill_in_defaults throws an error if there are any
555     # missing required parameters, we don't want it to do that
556     # so make them all optional.
557     inputs_copy = copy.deepcopy(tool.tool["inputs"])
558     for i in inputs_copy:
559         if "null" not in i["type"]:
560             i["type"] = ["null"] + aslist(i["type"])
561
562     fill_in_defaults(inputs_copy,
563                      builder_job_order,
564                      arvrunner.fs_access)
565     # Need to create a builder object to evaluate expressions.
566     builder = make_builder(builder_job_order,
567                            tool.hints,
568                            tool.requirements,
569                            ArvRuntimeContext(),
570                            tool.metadata)
571     # Now update job_order with secondaryFiles
572     discover_secondary_files(arvrunner.fs_access,
573                              builder,
574                              tool.tool["inputs"],
575                              job_order)
576
577     jobmapper = upload_dependencies(arvrunner,
578                                     name,
579                                     tool.doc_loader,
580                                     job_order,
581                                     job_order.get("id", "#"),
582                                     False)
583
584     if "id" in job_order:
585         del job_order["id"]
586
587     # Need to filter this out, gets added by cwltool when providing
588     # parameters on the command line.
589     if "job_order" in job_order:
590         del job_order["job_order"]
591
592     return job_order
593
594 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
595
596 def upload_workflow_deps(arvrunner, tool):
597     # Ensure that Docker images needed by this workflow are available
598
599     upload_docker(arvrunner, tool)
600
601     document_loader = tool.doc_loader
602
603     merged_map = {}
604
605     def upload_tool_deps(deptool):
606         if "id" in deptool:
607             discovered_secondaryfiles = {}
608             pm = upload_dependencies(arvrunner,
609                                      "%s dependencies" % (shortname(deptool["id"])),
610                                      document_loader,
611                                      deptool,
612                                      deptool["id"],
613                                      False,
614                                      include_primary=False,
615                                      discovered_secondaryfiles=discovered_secondaryfiles)
616             document_loader.idx[deptool["id"]] = deptool
617             toolmap = {}
618             for k,v in pm.items():
619                 toolmap[k] = v.resolved
620             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
621
622     tool.visit(upload_tool_deps)
623
624     return merged_map
625
626 def arvados_jobs_image(arvrunner, img):
627     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
628
629     try:
630         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
631                                                           arvrunner.runtimeContext.force_docker_pull,
632                                                           arvrunner.runtimeContext.tmp_outdir_prefix,
633                                                           arvrunner.runtimeContext.match_local_docker)
634     except Exception as e:
635         raise Exception("Docker image %s is not available\n%s" % (img, e) )
636
637
638 def upload_workflow_collection(arvrunner, name, packed):
639     collection = arvados.collection.Collection(api_client=arvrunner.api,
640                                                keep_client=arvrunner.keep_client,
641                                                num_retries=arvrunner.num_retries)
642     with collection.open("workflow.cwl", "w") as f:
643         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
644
645     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
646                ["name", "like", name+"%"]]
647     if arvrunner.project_uuid:
648         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
649     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
650
651     if exists["items"]:
652         logger.info("Using collection %s", exists["items"][0]["uuid"])
653     else:
654         collection.save_new(name=name,
655                             owner_uuid=arvrunner.project_uuid,
656                             ensure_unique_name=True,
657                             num_retries=arvrunner.num_retries)
658         logger.info("Uploaded to %s", collection.manifest_locator())
659
660     return collection.portable_data_hash()
661
662
663 class Runner(Process):
664     """Base class for runner processes, which submit an instance of
665     arvados-cwl-runner and wait for the final result."""
666
667     def __init__(self, runner, updated_tool,
668                  tool, loadingContext, enable_reuse,
669                  output_name, output_tags, submit_runner_ram=0,
670                  name=None, on_error=None, submit_runner_image=None,
671                  intermediate_output_ttl=0, merged_map=None,
672                  priority=None, secret_store=None,
673                  collection_cache_size=256,
674                  collection_cache_is_default=True):
675
676         loadingContext = loadingContext.copy()
677         loadingContext.metadata = updated_tool.metadata.copy()
678
679         super(Runner, self).__init__(updated_tool.tool, loadingContext)
680
681         self.arvrunner = runner
682         self.embedded_tool = tool
683         self.job_order = None
684         self.running = False
685         if enable_reuse:
686             # If reuse is permitted by command line arguments but
687             # disabled by the workflow itself, disable it.
688             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
689             if reuse_req:
690                 enable_reuse = reuse_req["enableReuse"]
691         self.enable_reuse = enable_reuse
692         self.uuid = None
693         self.final_output = None
694         self.output_name = output_name
695         self.output_tags = output_tags
696         self.name = name
697         self.on_error = on_error
698         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
699         self.intermediate_output_ttl = intermediate_output_ttl
700         self.priority = priority
701         self.secret_store = secret_store
702         self.enable_dev = loadingContext.enable_dev
703
704         self.submit_runner_cores = 1
705         self.submit_runner_ram = 1024  # defaut 1 GiB
706         self.collection_cache_size = collection_cache_size
707
708         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
709         if runner_resource_req:
710             if runner_resource_req.get("coresMin"):
711                 self.submit_runner_cores = runner_resource_req["coresMin"]
712             if runner_resource_req.get("ramMin"):
713                 self.submit_runner_ram = runner_resource_req["ramMin"]
714             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
715                 self.collection_cache_size = runner_resource_req["keep_cache"]
716
717         if submit_runner_ram:
718             # Command line / initializer overrides default and/or spec from workflow
719             self.submit_runner_ram = submit_runner_ram
720
721         if self.submit_runner_ram <= 0:
722             raise Exception("Value of submit-runner-ram must be greater than zero")
723
724         if self.submit_runner_cores <= 0:
725             raise Exception("Value of submit-runner-cores must be greater than zero")
726
727         self.merged_map = merged_map or {}
728
729     def job(self,
730             job_order,         # type: Mapping[Text, Text]
731             output_callbacks,  # type: Callable[[Any, Any], Any]
732             runtimeContext     # type: RuntimeContext
733            ):  # type: (...) -> Generator[Any, None, None]
734         self.job_order = job_order
735         self._init_job(job_order, runtimeContext)
736         yield self
737
738     def update_pipeline_component(self, record):
739         pass
740
741     def done(self, record):
742         """Base method for handling a completed runner."""
743
744         try:
745             if record["state"] == "Complete":
746                 if record.get("exit_code") is not None:
747                     if record["exit_code"] == 33:
748                         processStatus = "UnsupportedRequirement"
749                     elif record["exit_code"] == 0:
750                         processStatus = "success"
751                     else:
752                         processStatus = "permanentFail"
753                 else:
754                     processStatus = "success"
755             else:
756                 processStatus = "permanentFail"
757
758             outputs = {}
759
760             if processStatus == "permanentFail":
761                 logc = arvados.collection.CollectionReader(record["log"],
762                                                            api_client=self.arvrunner.api,
763                                                            keep_client=self.arvrunner.keep_client,
764                                                            num_retries=self.arvrunner.num_retries)
765                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
766
767             self.final_output = record["output"]
768             outc = arvados.collection.CollectionReader(self.final_output,
769                                                        api_client=self.arvrunner.api,
770                                                        keep_client=self.arvrunner.keep_client,
771                                                        num_retries=self.arvrunner.num_retries)
772             if "cwl.output.json" in outc:
773                 with outc.open("cwl.output.json", "rb") as f:
774                     if f.size() > 0:
775                         outputs = json.loads(f.read().decode())
776             def keepify(fileobj):
777                 path = fileobj["location"]
778                 if not path.startswith("keep:"):
779                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
780             adjustFileObjs(outputs, keepify)
781             adjustDirObjs(outputs, keepify)
782         except Exception:
783             logger.exception("[%s] While getting final output object", self.name)
784             self.arvrunner.output_callback({}, "permanentFail")
785         else:
786             self.arvrunner.output_callback(outputs, processStatus)