18339: Move trash-sweep from background thread to controller action.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109                  container_engine="docker"
110                 )
111
112 def search_schemadef(name, reqs):
113     for r in reqs:
114         if r["class"] == "SchemaDefRequirement":
115             for sd in r["types"]:
116                 if sd["name"] == name:
117                     return sd
118     return None
119
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121                                  "float", "double", "string", "record",
122                                  "array", "enum"))
123
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126         # union type, collect all possible secondaryFiles
127         for i in inputschema:
128             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129         return
130
131     if isinstance(inputschema, basestring):
132         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
133         if sd:
134             inputschema = sd
135         else:
136             return
137
138     if "secondaryFiles" in inputschema:
139         # set secondaryFiles, may be inherited by compound types.
140         secondaryspec = inputschema["secondaryFiles"]
141
142     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143         not isinstance(inputschema["type"], basestring)):
144         # compound type (union, array, record)
145         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146
147     elif (inputschema["type"] == "record" and
148           isinstance(primary, Mapping)):
149         #
150         # record type, find secondary files associated with fields.
151         #
152         for f in inputschema["fields"]:
153             p = primary.get(shortname(f["name"]))
154             if p:
155                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156
157     elif (inputschema["type"] == "array" and
158           isinstance(primary, Sequence)):
159         #
160         # array type, find secondary files of elements
161         #
162         for p in primary:
163             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164
165     elif (inputschema["type"] == "File" and
166           secondaryspec and
167           isinstance(primary, Mapping) and
168           primary.get("class") == "File" and
169           "secondaryFiles" not in primary):
170         #
171         # Found a file, check for secondaryFiles
172         #
173         specs = []
174         primary["secondaryFiles"] = secondaryspec
175         for i, sf in enumerate(aslist(secondaryspec)):
176             if builder.cwlVersion == "v1.0":
177                 pattern = builder.do_eval(sf, context=primary)
178             else:
179                 pattern = builder.do_eval(sf["pattern"], context=primary)
180             if pattern is None:
181                 continue
182             if isinstance(pattern, list):
183                 specs.extend(pattern)
184             elif isinstance(pattern, dict):
185                 specs.append(pattern)
186             elif isinstance(pattern, str):
187                 if builder.cwlVersion == "v1.0":
188                     specs.append({"pattern": pattern, "required": True})
189                 else:
190                     specs.append({"pattern": pattern, "required": sf.get("required")})
191             else:
192                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
193                     "Expression must return list, object, string or null")
194
195         found = []
196         for i, sf in enumerate(specs):
197             if isinstance(sf, dict):
198                 if sf.get("class") == "File":
199                     pattern = None
200                     if sf.get("location") is None:
201                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202                             "File object is missing 'location': %s" % sf)
203                     sfpath = sf["location"]
204                     required = True
205                 else:
206                     pattern = sf["pattern"]
207                     required = sf.get("required")
208             elif isinstance(sf, str):
209                 pattern = sf
210                 required = True
211             else:
212                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213                     "Expression must return list, object, string or null")
214
215             if pattern is not None:
216                 sfpath = substitute(primary["location"], pattern)
217
218             required = builder.do_eval(required, context=primary)
219
220             if fsaccess.exists(sfpath):
221                 if pattern is not None:
222                     found.append({"location": sfpath, "class": "File"})
223                 else:
224                     found.append(sf)
225             elif required:
226                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
227                     "Required secondary file '%s' does not exist" % sfpath)
228
229         primary["secondaryFiles"] = cmap(found)
230         if discovered is not None:
231             discovered[primary["location"]] = primary["secondaryFiles"]
232     elif inputschema["type"] not in primitive_types_set:
233         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
234
235 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
236     for inputschema in inputs:
237         primary = job_order.get(shortname(inputschema["id"]))
238         if isinstance(primary, (Mapping, Sequence)):
239             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
240
241 def upload_dependencies(arvrunner, name, document_loader,
242                         workflowobj, uri, loadref_run,
243                         include_primary=True, discovered_secondaryfiles=None):
244     """Upload the dependencies of the workflowobj document to Keep.
245
246     Returns a pathmapper object mapping local paths to keep references.  Also
247     does an in-place update of references in "workflowobj".
248
249     Use scandeps to find $import, $include, $schemas, run, File and Directory
250     fields that represent external references.
251
252     If workflowobj has an "id" field, this will reload the document to ensure
253     it is scanning the raw document prior to preprocessing.
254     """
255
256     loaded = set()
257     def loadref(b, u):
258         joined = document_loader.fetcher.urljoin(b, u)
259         defrg, _ = urllib.parse.urldefrag(joined)
260         if defrg not in loaded:
261             loaded.add(defrg)
262             # Use fetch_text to get raw file (before preprocessing).
263             text = document_loader.fetch_text(defrg)
264             if isinstance(text, bytes):
265                 textIO = StringIO(text.decode('utf-8'))
266             else:
267                 textIO = StringIO(text)
268             return yaml.safe_load(textIO)
269         else:
270             return {}
271
272     if loadref_run:
273         loadref_fields = set(("$import", "run"))
274     else:
275         loadref_fields = set(("$import",))
276
277     scanobj = workflowobj
278     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
279         # Need raw file content (before preprocessing) to ensure
280         # that external references in $include and $mixin are captured.
281         scanobj = loadref("", workflowobj["id"])
282
283     metadata = scanobj
284
285     sc_result = scandeps(uri, scanobj,
286                   loadref_fields,
287                   set(("$include", "$schemas", "location")),
288                   loadref, urljoin=document_loader.fetcher.urljoin)
289
290     sc = []
291     uuids = {}
292
293     def collect_uuids(obj):
294         loc = obj.get("location", "")
295         sp = loc.split(":")
296         if sp[0] == "keep":
297             # Collect collection uuids that need to be resolved to
298             # portable data hashes
299             gp = collection_uuid_pattern.match(loc)
300             if gp:
301                 uuids[gp.groups()[0]] = obj
302             if collectionUUID in obj:
303                 uuids[obj[collectionUUID]] = obj
304
305     def collect_uploads(obj):
306         loc = obj.get("location", "")
307         sp = loc.split(":")
308         if len(sp) < 1:
309             return
310         if sp[0] in ("file", "http", "https"):
311             # Record local files than need to be uploaded,
312             # don't include file literals, keep references, etc.
313             sc.append(obj)
314         collect_uuids(obj)
315
316     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
317     visit_class(sc_result, ("File", "Directory"), collect_uploads)
318
319     # Resolve any collection uuids we found to portable data hashes
320     # and assign them to uuid_map
321     uuid_map = {}
322     fetch_uuids = list(uuids.keys())
323     while fetch_uuids:
324         # For a large number of fetch_uuids, API server may limit
325         # response size, so keep fetching from API server has nothing
326         # more to give us.
327         lookups = arvrunner.api.collections().list(
328             filters=[["uuid", "in", fetch_uuids]],
329             count="none",
330             select=["uuid", "portable_data_hash"]).execute(
331                 num_retries=arvrunner.num_retries)
332
333         if not lookups["items"]:
334             break
335
336         for l in lookups["items"]:
337             uuid_map[l["uuid"]] = l["portable_data_hash"]
338
339         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
340
341     normalizeFilesDirs(sc)
342
343     if include_primary and "id" in workflowobj:
344         sc.append({"class": "File", "location": workflowobj["id"]})
345
346     if "$schemas" in workflowobj:
347         for s in workflowobj["$schemas"]:
348             sc.append({"class": "File", "location": s})
349
350     def visit_default(obj):
351         remove = [False]
352         def ensure_default_location(f):
353             if "location" not in f and "path" in f:
354                 f["location"] = f["path"]
355                 del f["path"]
356             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
357                 # Doesn't exist, remove from list of dependencies to upload
358                 sc[:] = [x for x in sc if x["location"] != f["location"]]
359                 # Delete "default" from workflowobj
360                 remove[0] = True
361         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
362         if remove[0]:
363             del obj["default"]
364
365     find_defaults(workflowobj, visit_default)
366
367     discovered = {}
368     def discover_default_secondary_files(obj):
369         builder_job_order = {}
370         for t in obj["inputs"]:
371             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
372         # Need to create a builder object to evaluate expressions.
373         builder = make_builder(builder_job_order,
374                                obj.get("hints", []),
375                                obj.get("requirements", []),
376                                ArvRuntimeContext(),
377                                metadata)
378         discover_secondary_files(arvrunner.fs_access,
379                                  builder,
380                                  obj["inputs"],
381                                  builder_job_order,
382                                  discovered)
383
384     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
385     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
386
387     for d in list(discovered):
388         # Only interested in discovered secondaryFiles which are local
389         # files that need to be uploaded.
390         if d.startswith("file:"):
391             sc.extend(discovered[d])
392         else:
393             del discovered[d]
394
395     mapper = ArvPathMapper(arvrunner, sc, "",
396                            "keep:%s",
397                            "keep:%s/%s",
398                            name=name,
399                            single_collection=True)
400
401     def setloc(p):
402         loc = p.get("location")
403         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
404             p["location"] = mapper.mapper(p["location"]).resolved
405             return
406
407         if not loc:
408             return
409
410         if collectionUUID in p:
411             uuid = p[collectionUUID]
412             if uuid not in uuid_map:
413                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
414                     "Collection uuid %s not found" % uuid)
415             gp = collection_pdh_pattern.match(loc)
416             if gp and uuid_map[uuid] != gp.groups()[0]:
417                 # This file entry has both collectionUUID and a PDH
418                 # location. If the PDH doesn't match the one returned
419                 # the API server, raise an error.
420                 raise SourceLine(p, "location", validate.ValidationException).makeError(
421                     "Expected collection uuid %s to be %s but API server reported %s" % (
422                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
423
424         gp = collection_uuid_pattern.match(loc)
425         if not gp:
426             return
427         uuid = gp.groups()[0]
428         if uuid not in uuid_map:
429             raise SourceLine(p, "location", validate.ValidationException).makeError(
430                 "Collection uuid %s not found" % uuid)
431         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
432         p[collectionUUID] = uuid
433
434     visit_class(workflowobj, ("File", "Directory"), setloc)
435     visit_class(discovered, ("File", "Directory"), setloc)
436
437     if discovered_secondaryfiles is not None:
438         for d in discovered:
439             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
440
441     if "$schemas" in workflowobj:
442         sch = CommentedSeq()
443         for s in workflowobj["$schemas"]:
444             if s in mapper:
445                 sch.append(mapper.mapper(s).resolved)
446         workflowobj["$schemas"] = sch
447
448     return mapper
449
450
451 def upload_docker(arvrunner, tool):
452     """Uploads Docker images used in CommandLineTool objects."""
453
454     if isinstance(tool, CommandLineTool):
455         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
456         if docker_req:
457             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
458                 # TODO: can be supported by containers API, but not jobs API.
459                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
460                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
461             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
462                                                        arvrunner.runtimeContext.force_docker_pull,
463                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
464         else:
465             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
466                                                        True, arvrunner.project_uuid,
467                                                        arvrunner.runtimeContext.force_docker_pull,
468                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
469     elif isinstance(tool, cwltool.workflow.Workflow):
470         for s in tool.steps:
471             upload_docker(arvrunner, s.embedded_tool)
472
473
474 def packed_workflow(arvrunner, tool, merged_map):
475     """Create a packed workflow.
476
477     A "packed" workflow is one where all the components have been combined into a single document."""
478
479     rewrites = {}
480     packed = pack(arvrunner.loadingContext, tool.tool["id"],
481                   rewrite_out=rewrites,
482                   loader=tool.doc_loader)
483
484     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
485
486     def visit(v, cur_id):
487         if isinstance(v, dict):
488             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
489                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
490                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
491                 if "id" in v:
492                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
493             if "path" in v and "location" not in v:
494                 v["location"] = v["path"]
495                 del v["path"]
496             if "location" in v and cur_id in merged_map:
497                 if v["location"] in merged_map[cur_id].resolved:
498                     v["location"] = merged_map[cur_id].resolved[v["location"]]
499                 if v["location"] in merged_map[cur_id].secondaryFiles:
500                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
501             if v.get("class") == "DockerRequirement":
502                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
503                                                                                                              arvrunner.project_uuid,
504                                                                                                              arvrunner.runtimeContext.force_docker_pull,
505                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix)
506             for l in v:
507                 visit(v[l], cur_id)
508         if isinstance(v, list):
509             for l in v:
510                 visit(l, cur_id)
511     visit(packed, None)
512     return packed
513
514
515 def tag_git_version(packed):
516     if tool.tool["id"].startswith("file://"):
517         path = os.path.dirname(tool.tool["id"][7:])
518         try:
519             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
520         except (OSError, subprocess.CalledProcessError):
521             pass
522         else:
523             packed["http://schema.org/version"] = githash
524
525
526 def upload_job_order(arvrunner, name, tool, job_order):
527     """Upload local files referenced in the input object and return updated input
528     object with 'location' updated to the proper keep references.
529     """
530
531     # Make a copy of the job order and set defaults.
532     builder_job_order = copy.copy(job_order)
533
534     # fill_in_defaults throws an error if there are any
535     # missing required parameters, we don't want it to do that
536     # so make them all optional.
537     inputs_copy = copy.deepcopy(tool.tool["inputs"])
538     for i in inputs_copy:
539         if "null" not in i["type"]:
540             i["type"] = ["null"] + aslist(i["type"])
541
542     fill_in_defaults(inputs_copy,
543                      builder_job_order,
544                      arvrunner.fs_access)
545     # Need to create a builder object to evaluate expressions.
546     builder = make_builder(builder_job_order,
547                            tool.hints,
548                            tool.requirements,
549                            ArvRuntimeContext(),
550                            tool.metadata)
551     # Now update job_order with secondaryFiles
552     discover_secondary_files(arvrunner.fs_access,
553                              builder,
554                              tool.tool["inputs"],
555                              job_order)
556
557     jobmapper = upload_dependencies(arvrunner,
558                                     name,
559                                     tool.doc_loader,
560                                     job_order,
561                                     job_order.get("id", "#"),
562                                     False)
563
564     if "id" in job_order:
565         del job_order["id"]
566
567     # Need to filter this out, gets added by cwltool when providing
568     # parameters on the command line.
569     if "job_order" in job_order:
570         del job_order["job_order"]
571
572     return job_order
573
574 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
575
576 def upload_workflow_deps(arvrunner, tool):
577     # Ensure that Docker images needed by this workflow are available
578
579     upload_docker(arvrunner, tool)
580
581     document_loader = tool.doc_loader
582
583     merged_map = {}
584
585     def upload_tool_deps(deptool):
586         if "id" in deptool:
587             discovered_secondaryfiles = {}
588             pm = upload_dependencies(arvrunner,
589                                      "%s dependencies" % (shortname(deptool["id"])),
590                                      document_loader,
591                                      deptool,
592                                      deptool["id"],
593                                      False,
594                                      include_primary=False,
595                                      discovered_secondaryfiles=discovered_secondaryfiles)
596             document_loader.idx[deptool["id"]] = deptool
597             toolmap = {}
598             for k,v in pm.items():
599                 toolmap[k] = v.resolved
600             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
601
602     tool.visit(upload_tool_deps)
603
604     return merged_map
605
606 def arvados_jobs_image(arvrunner, img):
607     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
608
609     try:
610         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
611                                                           arvrunner.runtimeContext.force_docker_pull,
612                                                           arvrunner.runtimeContext.tmp_outdir_prefix)
613     except Exception as e:
614         raise Exception("Docker image %s is not available\n%s" % (img, e) )
615
616
617 def upload_workflow_collection(arvrunner, name, packed):
618     collection = arvados.collection.Collection(api_client=arvrunner.api,
619                                                keep_client=arvrunner.keep_client,
620                                                num_retries=arvrunner.num_retries)
621     with collection.open("workflow.cwl", "w") as f:
622         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
623
624     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
625                ["name", "like", name+"%"]]
626     if arvrunner.project_uuid:
627         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
628     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
629
630     if exists["items"]:
631         logger.info("Using collection %s", exists["items"][0]["uuid"])
632     else:
633         collection.save_new(name=name,
634                             owner_uuid=arvrunner.project_uuid,
635                             ensure_unique_name=True,
636                             num_retries=arvrunner.num_retries)
637         logger.info("Uploaded to %s", collection.manifest_locator())
638
639     return collection.portable_data_hash()
640
641
642 class Runner(Process):
643     """Base class for runner processes, which submit an instance of
644     arvados-cwl-runner and wait for the final result."""
645
646     def __init__(self, runner, updated_tool,
647                  tool, loadingContext, enable_reuse,
648                  output_name, output_tags, submit_runner_ram=0,
649                  name=None, on_error=None, submit_runner_image=None,
650                  intermediate_output_ttl=0, merged_map=None,
651                  priority=None, secret_store=None,
652                  collection_cache_size=256,
653                  collection_cache_is_default=True):
654
655         loadingContext = loadingContext.copy()
656         loadingContext.metadata = updated_tool.metadata.copy()
657
658         super(Runner, self).__init__(updated_tool.tool, loadingContext)
659
660         self.arvrunner = runner
661         self.embedded_tool = tool
662         self.job_order = None
663         self.running = False
664         if enable_reuse:
665             # If reuse is permitted by command line arguments but
666             # disabled by the workflow itself, disable it.
667             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
668             if reuse_req:
669                 enable_reuse = reuse_req["enableReuse"]
670         self.enable_reuse = enable_reuse
671         self.uuid = None
672         self.final_output = None
673         self.output_name = output_name
674         self.output_tags = output_tags
675         self.name = name
676         self.on_error = on_error
677         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
678         self.intermediate_output_ttl = intermediate_output_ttl
679         self.priority = priority
680         self.secret_store = secret_store
681         self.enable_dev = loadingContext.enable_dev
682
683         self.submit_runner_cores = 1
684         self.submit_runner_ram = 1024  # defaut 1 GiB
685         self.collection_cache_size = collection_cache_size
686
687         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
688         if runner_resource_req:
689             if runner_resource_req.get("coresMin"):
690                 self.submit_runner_cores = runner_resource_req["coresMin"]
691             if runner_resource_req.get("ramMin"):
692                 self.submit_runner_ram = runner_resource_req["ramMin"]
693             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
694                 self.collection_cache_size = runner_resource_req["keep_cache"]
695
696         if submit_runner_ram:
697             # Command line / initializer overrides default and/or spec from workflow
698             self.submit_runner_ram = submit_runner_ram
699
700         if self.submit_runner_ram <= 0:
701             raise Exception("Value of submit-runner-ram must be greater than zero")
702
703         if self.submit_runner_cores <= 0:
704             raise Exception("Value of submit-runner-cores must be greater than zero")
705
706         self.merged_map = merged_map or {}
707
708     def job(self,
709             job_order,         # type: Mapping[Text, Text]
710             output_callbacks,  # type: Callable[[Any, Any], Any]
711             runtimeContext     # type: RuntimeContext
712            ):  # type: (...) -> Generator[Any, None, None]
713         self.job_order = job_order
714         self._init_job(job_order, runtimeContext)
715         yield self
716
717     def update_pipeline_component(self, record):
718         pass
719
720     def done(self, record):
721         """Base method for handling a completed runner."""
722
723         try:
724             if record["state"] == "Complete":
725                 if record.get("exit_code") is not None:
726                     if record["exit_code"] == 33:
727                         processStatus = "UnsupportedRequirement"
728                     elif record["exit_code"] == 0:
729                         processStatus = "success"
730                     else:
731                         processStatus = "permanentFail"
732                 else:
733                     processStatus = "success"
734             else:
735                 processStatus = "permanentFail"
736
737             outputs = {}
738
739             if processStatus == "permanentFail":
740                 logc = arvados.collection.CollectionReader(record["log"],
741                                                            api_client=self.arvrunner.api,
742                                                            keep_client=self.arvrunner.keep_client,
743                                                            num_retries=self.arvrunner.num_retries)
744                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
745
746             self.final_output = record["output"]
747             outc = arvados.collection.CollectionReader(self.final_output,
748                                                        api_client=self.arvrunner.api,
749                                                        keep_client=self.arvrunner.keep_client,
750                                                        num_retries=self.arvrunner.num_retries)
751             if "cwl.output.json" in outc:
752                 with outc.open("cwl.output.json", "rb") as f:
753                     if f.size() > 0:
754                         outputs = json.loads(f.read().decode())
755             def keepify(fileobj):
756                 path = fileobj["location"]
757                 if not path.startswith("keep:"):
758                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
759             adjustFileObjs(outputs, keepify)
760             adjustDirObjs(outputs, keepify)
761         except Exception:
762             logger.exception("[%s] While getting final output object", self.name)
763             self.arvrunner.output_callback({}, "permanentFail")
764         else:
765             self.arvrunner.output_callback(outputs, processStatus)