f2517f919990f2de670d9d291e5d7ac94edf0765
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109                  container_engine="docker"
110                 )
111
112 def search_schemadef(name, reqs):
113     for r in reqs:
114         if r["class"] == "SchemaDefRequirement":
115             for sd in r["types"]:
116                 if sd["name"] == name:
117                     return sd
118     return None
119
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121                                  "float", "double", "string", "record",
122                                  "array", "enum"))
123
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126         # union type, collect all possible secondaryFiles
127         for i in inputschema:
128             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129         return
130
131     if inputschema == "File":
132         inputschema = {"type": "File"}
133
134     if isinstance(inputschema, basestring):
135         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
136         if sd:
137             inputschema = sd
138         else:
139             return
140
141     if "secondaryFiles" in inputschema:
142         # set secondaryFiles, may be inherited by compound types.
143         secondaryspec = inputschema["secondaryFiles"]
144
145     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
146         not isinstance(inputschema["type"], basestring)):
147         # compound type (union, array, record)
148         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
149
150     elif (inputschema["type"] == "record" and
151           isinstance(primary, Mapping)):
152         #
153         # record type, find secondary files associated with fields.
154         #
155         for f in inputschema["fields"]:
156             p = primary.get(shortname(f["name"]))
157             if p:
158                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
159
160     elif (inputschema["type"] == "array" and
161           isinstance(primary, Sequence)):
162         #
163         # array type, find secondary files of elements
164         #
165         for p in primary:
166             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
167
168     elif (inputschema["type"] == "File" and
169           secondaryspec and
170           isinstance(primary, Mapping) and
171           primary.get("class") == "File" and
172           "secondaryFiles" not in primary):
173         #
174         # Found a file, check for secondaryFiles
175         #
176         specs = []
177         primary["secondaryFiles"] = secondaryspec
178         for i, sf in enumerate(aslist(secondaryspec)):
179             if builder.cwlVersion == "v1.0":
180                 pattern = builder.do_eval(sf, context=primary)
181             else:
182                 pattern = builder.do_eval(sf["pattern"], context=primary)
183             if pattern is None:
184                 continue
185             if isinstance(pattern, list):
186                 specs.extend(pattern)
187             elif isinstance(pattern, dict):
188                 specs.append(pattern)
189             elif isinstance(pattern, str):
190                 if builder.cwlVersion == "v1.0":
191                     specs.append({"pattern": pattern, "required": True})
192                 else:
193                     specs.append({"pattern": pattern, "required": sf.get("required")})
194             else:
195                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
196                     "Expression must return list, object, string or null")
197
198         found = []
199         for i, sf in enumerate(specs):
200             if isinstance(sf, dict):
201                 if sf.get("class") == "File":
202                     pattern = None
203                     if sf.get("location") is None:
204                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
205                             "File object is missing 'location': %s" % sf)
206                     sfpath = sf["location"]
207                     required = True
208                 else:
209                     pattern = sf["pattern"]
210                     required = sf.get("required")
211             elif isinstance(sf, str):
212                 pattern = sf
213                 required = True
214             else:
215                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
216                     "Expression must return list, object, string or null")
217
218             if pattern is not None:
219                 sfpath = substitute(primary["location"], pattern)
220
221             required = builder.do_eval(required, context=primary)
222
223             if fsaccess.exists(sfpath):
224                 if pattern is not None:
225                     found.append({"location": sfpath, "class": "File"})
226                 else:
227                     found.append(sf)
228             elif required:
229                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
230                     "Required secondary file '%s' does not exist" % sfpath)
231
232         primary["secondaryFiles"] = cmap(found)
233         if discovered is not None:
234             discovered[primary["location"]] = primary["secondaryFiles"]
235     elif inputschema["type"] not in primitive_types_set:
236         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
237
238 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
239     for inputschema in inputs:
240         primary = job_order.get(shortname(inputschema["id"]))
241         if isinstance(primary, (Mapping, Sequence)):
242             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
243
244 def upload_dependencies(arvrunner, name, document_loader,
245                         workflowobj, uri, loadref_run,
246                         include_primary=True, discovered_secondaryfiles=None):
247     """Upload the dependencies of the workflowobj document to Keep.
248
249     Returns a pathmapper object mapping local paths to keep references.  Also
250     does an in-place update of references in "workflowobj".
251
252     Use scandeps to find $import, $include, $schemas, run, File and Directory
253     fields that represent external references.
254
255     If workflowobj has an "id" field, this will reload the document to ensure
256     it is scanning the raw document prior to preprocessing.
257     """
258
259     loaded = set()
260     def loadref(b, u):
261         joined = document_loader.fetcher.urljoin(b, u)
262         defrg, _ = urllib.parse.urldefrag(joined)
263         if defrg not in loaded:
264             loaded.add(defrg)
265             # Use fetch_text to get raw file (before preprocessing).
266             text = document_loader.fetch_text(defrg)
267             if isinstance(text, bytes):
268                 textIO = StringIO(text.decode('utf-8'))
269             else:
270                 textIO = StringIO(text)
271             yamlloader = YAML(typ='safe', pure=True)
272             return yamlloader.load(textIO)
273         else:
274             return {}
275
276     if loadref_run:
277         loadref_fields = set(("$import", "run"))
278     else:
279         loadref_fields = set(("$import",))
280
281     scanobj = workflowobj
282     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
283         # Need raw file content (before preprocessing) to ensure
284         # that external references in $include and $mixin are captured.
285         scanobj = loadref("", workflowobj["id"])
286
287     metadata = scanobj
288
289     sc_result = scandeps(uri, scanobj,
290                          loadref_fields,
291                          set(("$include", "location")),
292                          loadref, urljoin=document_loader.fetcher.urljoin,
293                          nestdirs=False)
294
295     optional_deps = scandeps(uri, scanobj,
296                                   loadref_fields,
297                                   set(("$schemas",)),
298                                   loadref, urljoin=document_loader.fetcher.urljoin,
299                                   nestdirs=False)
300
301     sc_result.extend(optional_deps)
302
303     sc = []
304     uuids = {}
305
306     def collect_uuids(obj):
307         loc = obj.get("location", "")
308         sp = loc.split(":")
309         if sp[0] == "keep":
310             # Collect collection uuids that need to be resolved to
311             # portable data hashes
312             gp = collection_uuid_pattern.match(loc)
313             if gp:
314                 uuids[gp.groups()[0]] = obj
315             if collectionUUID in obj:
316                 uuids[obj[collectionUUID]] = obj
317
318     def collect_uploads(obj):
319         loc = obj.get("location", "")
320         sp = loc.split(":")
321         if len(sp) < 1:
322             return
323         if sp[0] in ("file", "http", "https"):
324             # Record local files than need to be uploaded,
325             # don't include file literals, keep references, etc.
326             sc.append(obj)
327         collect_uuids(obj)
328
329     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
330     visit_class(sc_result, ("File", "Directory"), collect_uploads)
331
332     # Resolve any collection uuids we found to portable data hashes
333     # and assign them to uuid_map
334     uuid_map = {}
335     fetch_uuids = list(uuids.keys())
336     while fetch_uuids:
337         # For a large number of fetch_uuids, API server may limit
338         # response size, so keep fetching from API server has nothing
339         # more to give us.
340         lookups = arvrunner.api.collections().list(
341             filters=[["uuid", "in", fetch_uuids]],
342             count="none",
343             select=["uuid", "portable_data_hash"]).execute(
344                 num_retries=arvrunner.num_retries)
345
346         if not lookups["items"]:
347             break
348
349         for l in lookups["items"]:
350             uuid_map[l["uuid"]] = l["portable_data_hash"]
351
352         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
353
354     normalizeFilesDirs(sc)
355
356     if include_primary and "id" in workflowobj:
357         sc.append({"class": "File", "location": workflowobj["id"]})
358
359     def visit_default(obj):
360         def defaults_are_optional(f):
361             if "location" not in f and "path" in f:
362                 f["location"] = f["path"]
363                 del f["path"]
364             normalizeFilesDirs(f)
365             optional_deps.append(f)
366         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
367
368     find_defaults(workflowobj, visit_default)
369
370     discovered = {}
371     def discover_default_secondary_files(obj):
372         builder_job_order = {}
373         for t in obj["inputs"]:
374             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
375         # Need to create a builder object to evaluate expressions.
376         builder = make_builder(builder_job_order,
377                                obj.get("hints", []),
378                                obj.get("requirements", []),
379                                ArvRuntimeContext(),
380                                metadata)
381         discover_secondary_files(arvrunner.fs_access,
382                                  builder,
383                                  obj["inputs"],
384                                  builder_job_order,
385                                  discovered)
386
387     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
388     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
389
390     for d in list(discovered):
391         # Only interested in discovered secondaryFiles which are local
392         # files that need to be uploaded.
393         if d.startswith("file:"):
394             sc.extend(discovered[d])
395         else:
396             del discovered[d]
397
398     mapper = ArvPathMapper(arvrunner, sc, "",
399                            "keep:%s",
400                            "keep:%s/%s",
401                            name=name,
402                            single_collection=True,
403                            optional_deps=optional_deps)
404
405     def setloc(p):
406         loc = p.get("location")
407         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
408             p["location"] = mapper.mapper(p["location"]).resolved
409             return
410
411         if not loc:
412             return
413
414         if collectionUUID in p:
415             uuid = p[collectionUUID]
416             if uuid not in uuid_map:
417                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
418                     "Collection uuid %s not found" % uuid)
419             gp = collection_pdh_pattern.match(loc)
420             if gp and uuid_map[uuid] != gp.groups()[0]:
421                 # This file entry has both collectionUUID and a PDH
422                 # location. If the PDH doesn't match the one returned
423                 # the API server, raise an error.
424                 raise SourceLine(p, "location", validate.ValidationException).makeError(
425                     "Expected collection uuid %s to be %s but API server reported %s" % (
426                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
427
428         gp = collection_uuid_pattern.match(loc)
429         if not gp:
430             return
431         uuid = gp.groups()[0]
432         if uuid not in uuid_map:
433             raise SourceLine(p, "location", validate.ValidationException).makeError(
434                 "Collection uuid %s not found" % uuid)
435         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
436         p[collectionUUID] = uuid
437
438     visit_class(workflowobj, ("File", "Directory"), setloc)
439     visit_class(discovered, ("File", "Directory"), setloc)
440
441     if discovered_secondaryfiles is not None:
442         for d in discovered:
443             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
444
445     if "$schemas" in workflowobj:
446         sch = CommentedSeq()
447         for s in workflowobj["$schemas"]:
448             if s in mapper:
449                 sch.append(mapper.mapper(s).resolved)
450         workflowobj["$schemas"] = sch
451
452     return mapper
453
454
455 def upload_docker(arvrunner, tool):
456     """Uploads Docker images used in CommandLineTool objects."""
457
458     if isinstance(tool, CommandLineTool):
459         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
460         if docker_req:
461             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
462                 # TODO: can be supported by containers API, but not jobs API.
463                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
464                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
465             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
466                                                        arvrunner.runtimeContext.force_docker_pull,
467                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
468                                                        arvrunner.runtimeContext.match_local_docker)
469         else:
470             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
471                                                        True, arvrunner.project_uuid,
472                                                        arvrunner.runtimeContext.force_docker_pull,
473                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
474                                                        arvrunner.runtimeContext.match_local_docker)
475     elif isinstance(tool, cwltool.workflow.Workflow):
476         for s in tool.steps:
477             upload_docker(arvrunner, s.embedded_tool)
478
479
480 def packed_workflow(arvrunner, tool, merged_map):
481     """Create a packed workflow.
482
483     A "packed" workflow is one where all the components have been combined into a single document."""
484
485     rewrites = {}
486     packed = pack(arvrunner.loadingContext, tool.tool["id"],
487                   rewrite_out=rewrites,
488                   loader=tool.doc_loader)
489
490     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
491
492     def visit(v, cur_id):
493         if isinstance(v, dict):
494             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
495                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
496                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
497                 if "id" in v:
498                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
499             if "path" in v and "location" not in v:
500                 v["location"] = v["path"]
501                 del v["path"]
502             if "location" in v and cur_id in merged_map:
503                 if v["location"] in merged_map[cur_id].resolved:
504                     v["location"] = merged_map[cur_id].resolved[v["location"]]
505                 if v["location"] in merged_map[cur_id].secondaryFiles:
506                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
507             if v.get("class") == "DockerRequirement":
508                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
509                                                                                                              arvrunner.project_uuid,
510                                                                                                              arvrunner.runtimeContext.force_docker_pull,
511                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix,
512                                                                                                              arvrunner.runtimeContext.match_local_docker)
513             for l in v:
514                 visit(v[l], cur_id)
515         if isinstance(v, list):
516             for l in v:
517                 visit(l, cur_id)
518     visit(packed, None)
519     return packed
520
521
522 def tag_git_version(packed):
523     if tool.tool["id"].startswith("file://"):
524         path = os.path.dirname(tool.tool["id"][7:])
525         try:
526             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
527         except (OSError, subprocess.CalledProcessError):
528             pass
529         else:
530             packed["http://schema.org/version"] = githash
531
532
533 def upload_job_order(arvrunner, name, tool, job_order):
534     """Upload local files referenced in the input object and return updated input
535     object with 'location' updated to the proper keep references.
536     """
537
538     # Make a copy of the job order and set defaults.
539     builder_job_order = copy.copy(job_order)
540
541     # fill_in_defaults throws an error if there are any
542     # missing required parameters, we don't want it to do that
543     # so make them all optional.
544     inputs_copy = copy.deepcopy(tool.tool["inputs"])
545     for i in inputs_copy:
546         if "null" not in i["type"]:
547             i["type"] = ["null"] + aslist(i["type"])
548
549     fill_in_defaults(inputs_copy,
550                      builder_job_order,
551                      arvrunner.fs_access)
552     # Need to create a builder object to evaluate expressions.
553     builder = make_builder(builder_job_order,
554                            tool.hints,
555                            tool.requirements,
556                            ArvRuntimeContext(),
557                            tool.metadata)
558     # Now update job_order with secondaryFiles
559     discover_secondary_files(arvrunner.fs_access,
560                              builder,
561                              tool.tool["inputs"],
562                              job_order)
563
564     jobmapper = upload_dependencies(arvrunner,
565                                     name,
566                                     tool.doc_loader,
567                                     job_order,
568                                     job_order.get("id", "#"),
569                                     False)
570
571     if "id" in job_order:
572         del job_order["id"]
573
574     # Need to filter this out, gets added by cwltool when providing
575     # parameters on the command line.
576     if "job_order" in job_order:
577         del job_order["job_order"]
578
579     return job_order
580
581 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
582
583 def upload_workflow_deps(arvrunner, tool):
584     # Ensure that Docker images needed by this workflow are available
585
586     upload_docker(arvrunner, tool)
587
588     document_loader = tool.doc_loader
589
590     merged_map = {}
591
592     def upload_tool_deps(deptool):
593         if "id" in deptool:
594             discovered_secondaryfiles = {}
595             pm = upload_dependencies(arvrunner,
596                                      "%s dependencies" % (shortname(deptool["id"])),
597                                      document_loader,
598                                      deptool,
599                                      deptool["id"],
600                                      False,
601                                      include_primary=False,
602                                      discovered_secondaryfiles=discovered_secondaryfiles)
603             document_loader.idx[deptool["id"]] = deptool
604             toolmap = {}
605             for k,v in pm.items():
606                 toolmap[k] = v.resolved
607             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
608
609     tool.visit(upload_tool_deps)
610
611     return merged_map
612
613 def arvados_jobs_image(arvrunner, img):
614     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
615
616     try:
617         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
618                                                           arvrunner.runtimeContext.force_docker_pull,
619                                                           arvrunner.runtimeContext.tmp_outdir_prefix,
620                                                           arvrunner.runtimeContext.match_local_docker)
621     except Exception as e:
622         raise Exception("Docker image %s is not available\n%s" % (img, e) )
623
624
625 def upload_workflow_collection(arvrunner, name, packed):
626     collection = arvados.collection.Collection(api_client=arvrunner.api,
627                                                keep_client=arvrunner.keep_client,
628                                                num_retries=arvrunner.num_retries)
629     with collection.open("workflow.cwl", "w") as f:
630         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
631
632     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
633                ["name", "like", name+"%"]]
634     if arvrunner.project_uuid:
635         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
636     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
637
638     if exists["items"]:
639         logger.info("Using collection %s", exists["items"][0]["uuid"])
640     else:
641         collection.save_new(name=name,
642                             owner_uuid=arvrunner.project_uuid,
643                             ensure_unique_name=True,
644                             num_retries=arvrunner.num_retries)
645         logger.info("Uploaded to %s", collection.manifest_locator())
646
647     return collection.portable_data_hash()
648
649
650 class Runner(Process):
651     """Base class for runner processes, which submit an instance of
652     arvados-cwl-runner and wait for the final result."""
653
654     def __init__(self, runner, updated_tool,
655                  tool, loadingContext, enable_reuse,
656                  output_name, output_tags, submit_runner_ram=0,
657                  name=None, on_error=None, submit_runner_image=None,
658                  intermediate_output_ttl=0, merged_map=None,
659                  priority=None, secret_store=None,
660                  collection_cache_size=256,
661                  collection_cache_is_default=True):
662
663         loadingContext = loadingContext.copy()
664         loadingContext.metadata = updated_tool.metadata.copy()
665
666         super(Runner, self).__init__(updated_tool.tool, loadingContext)
667
668         self.arvrunner = runner
669         self.embedded_tool = tool
670         self.job_order = None
671         self.running = False
672         if enable_reuse:
673             # If reuse is permitted by command line arguments but
674             # disabled by the workflow itself, disable it.
675             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
676             if reuse_req:
677                 enable_reuse = reuse_req["enableReuse"]
678         self.enable_reuse = enable_reuse
679         self.uuid = None
680         self.final_output = None
681         self.output_name = output_name
682         self.output_tags = output_tags
683         self.name = name
684         self.on_error = on_error
685         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
686         self.intermediate_output_ttl = intermediate_output_ttl
687         self.priority = priority
688         self.secret_store = secret_store
689         self.enable_dev = loadingContext.enable_dev
690
691         self.submit_runner_cores = 1
692         self.submit_runner_ram = 1024  # defaut 1 GiB
693         self.collection_cache_size = collection_cache_size
694
695         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
696         if runner_resource_req:
697             if runner_resource_req.get("coresMin"):
698                 self.submit_runner_cores = runner_resource_req["coresMin"]
699             if runner_resource_req.get("ramMin"):
700                 self.submit_runner_ram = runner_resource_req["ramMin"]
701             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
702                 self.collection_cache_size = runner_resource_req["keep_cache"]
703
704         if submit_runner_ram:
705             # Command line / initializer overrides default and/or spec from workflow
706             self.submit_runner_ram = submit_runner_ram
707
708         if self.submit_runner_ram <= 0:
709             raise Exception("Value of submit-runner-ram must be greater than zero")
710
711         if self.submit_runner_cores <= 0:
712             raise Exception("Value of submit-runner-cores must be greater than zero")
713
714         self.merged_map = merged_map or {}
715
716     def job(self,
717             job_order,         # type: Mapping[Text, Text]
718             output_callbacks,  # type: Callable[[Any, Any], Any]
719             runtimeContext     # type: RuntimeContext
720            ):  # type: (...) -> Generator[Any, None, None]
721         self.job_order = job_order
722         self._init_job(job_order, runtimeContext)
723         yield self
724
725     def update_pipeline_component(self, record):
726         pass
727
728     def done(self, record):
729         """Base method for handling a completed runner."""
730
731         try:
732             if record["state"] == "Complete":
733                 if record.get("exit_code") is not None:
734                     if record["exit_code"] == 33:
735                         processStatus = "UnsupportedRequirement"
736                     elif record["exit_code"] == 0:
737                         processStatus = "success"
738                     else:
739                         processStatus = "permanentFail"
740                 else:
741                     processStatus = "success"
742             else:
743                 processStatus = "permanentFail"
744
745             outputs = {}
746
747             if processStatus == "permanentFail":
748                 logc = arvados.collection.CollectionReader(record["log"],
749                                                            api_client=self.arvrunner.api,
750                                                            keep_client=self.arvrunner.keep_client,
751                                                            num_retries=self.arvrunner.num_retries)
752                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
753
754             self.final_output = record["output"]
755             outc = arvados.collection.CollectionReader(self.final_output,
756                                                        api_client=self.arvrunner.api,
757                                                        keep_client=self.arvrunner.keep_client,
758                                                        num_retries=self.arvrunner.num_retries)
759             if "cwl.output.json" in outc:
760                 with outc.open("cwl.output.json", "rb") as f:
761                     if f.size() > 0:
762                         outputs = json.loads(f.read().decode())
763             def keepify(fileobj):
764                 path = fileobj["location"]
765                 if not path.startswith("keep:"):
766                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
767             adjustFileObjs(outputs, keepify)
768             adjustDirObjs(outputs, keepify)
769         except Exception:
770             logger.exception("[%s] While getting final output object", self.name)
771             self.arvrunner.output_callback({}, "permanentFail")
772         else:
773             self.arvrunner.output_callback(outputs, processStatus)