15028: Handling secondaryFiles on record fields
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
41
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                 )
109
110 def search_schemadef(name, reqs):
111     for r in reqs:
112         if r["class"] == "SchemaDefRequirement":
113             for sd in r["types"]:
114                 if sd["name"] == name:
115                     return sd
116     return None
117
118 primitive_types_set = frozenset(("null", "boolean", "int", "long",
119                                  "float", "double", "string", "record",
120                                  "array", "enum"))
121
122 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
123     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
124         # union type, collect all possible secondaryFiles
125         for i in inputschema:
126             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
127         return
128
129     if isinstance(inputschema, basestring):
130         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
131         if sd:
132             inputschema = sd
133         else:
134             return
135
136     if "secondaryFiles" in inputschema:
137         # set secondaryFiles, may be inherited by compound types.
138         secondaryspec = inputschema["secondaryFiles"]
139
140     if isinstance(inputschema["type"], Mapping):
141         # compound type (array or record)
142         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
143
144     elif (inputschema["type"] == "record" and
145           isinstance(primary, Mapping)):
146         #
147         # record type, find secondary files associated with fields.
148         #
149         for f in inputschema["fields"]:
150             p = primary.get(shortname(f["name"]))
151             if p:
152                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
153
154     elif (inputschema["type"] == "array" and
155           isinstance(primary, Sequence)):
156         #
157         # array type, find secondary files of elements
158         #
159         for p in primary:
160             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
161
162     elif (inputschema["type"] == "File" and
163           secondaryspec and
164           isinstance(primary, Mapping) and
165           primary.get("class") == "File" and
166           "secondaryFiles" not in primary):
167         #
168         # Found a file, check for secondaryFiles
169         #
170         primary["secondaryFiles"] = []
171         for i, sf in enumerate(aslist(secondaryspec)):
172             pattern = builder.do_eval(sf["pattern"], context=primary)
173             if pattern is None:
174                 continue
175             sfpath = substitute(primary["location"], pattern)
176             required = builder.do_eval(sf["required"], context=primary)
177
178             if fsaccess.exists(sfpath):
179                 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
180             elif required:
181                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
182                     "Required secondary file '%s' does not exist" % sfpath)
183
184         primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
185         if discovered is not None:
186             discovered[primary["location"]] = primary["secondaryFiles"]
187     elif inputschema["type"] not in primitive_types_set:
188         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
189
190 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
191     for inputschema in inputs:
192         primary = job_order.get(shortname(inputschema["id"]))
193         if isinstance(primary, (Mapping, Sequence)):
194             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
195
196 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
197 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
198
199 def upload_dependencies(arvrunner, name, document_loader,
200                         workflowobj, uri, loadref_run,
201                         include_primary=True, discovered_secondaryfiles=None):
202     """Upload the dependencies of the workflowobj document to Keep.
203
204     Returns a pathmapper object mapping local paths to keep references.  Also
205     does an in-place update of references in "workflowobj".
206
207     Use scandeps to find $import, $include, $schemas, run, File and Directory
208     fields that represent external references.
209
210     If workflowobj has an "id" field, this will reload the document to ensure
211     it is scanning the raw document prior to preprocessing.
212     """
213
214     loaded = set()
215     def loadref(b, u):
216         joined = document_loader.fetcher.urljoin(b, u)
217         defrg, _ = urllib.parse.urldefrag(joined)
218         if defrg not in loaded:
219             loaded.add(defrg)
220             # Use fetch_text to get raw file (before preprocessing).
221             text = document_loader.fetch_text(defrg)
222             if isinstance(text, bytes):
223                 textIO = StringIO(text.decode('utf-8'))
224             else:
225                 textIO = StringIO(text)
226             return yaml.safe_load(textIO)
227         else:
228             return {}
229
230     if loadref_run:
231         loadref_fields = set(("$import", "run"))
232     else:
233         loadref_fields = set(("$import",))
234
235     scanobj = workflowobj
236     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
237         # Need raw file content (before preprocessing) to ensure
238         # that external references in $include and $mixin are captured.
239         scanobj = loadref("", workflowobj["id"])
240
241     sc_result = scandeps(uri, scanobj,
242                   loadref_fields,
243                   set(("$include", "$schemas", "location")),
244                   loadref, urljoin=document_loader.fetcher.urljoin)
245
246     sc = []
247     uuids = {}
248
249     def collect_uuids(obj):
250         loc = obj.get("location", "")
251         sp = loc.split(":")
252         if sp[0] == "keep":
253             # Collect collection uuids that need to be resolved to
254             # portable data hashes
255             gp = collection_uuid_pattern.match(loc)
256             if gp:
257                 uuids[gp.groups()[0]] = obj
258             if collectionUUID in obj:
259                 uuids[obj[collectionUUID]] = obj
260
261     def collect_uploads(obj):
262         loc = obj.get("location", "")
263         sp = loc.split(":")
264         if len(sp) < 1:
265             return
266         if sp[0] in ("file", "http", "https"):
267             # Record local files than need to be uploaded,
268             # don't include file literals, keep references, etc.
269             sc.append(obj)
270         collect_uuids(obj)
271
272     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
273     visit_class(sc_result, ("File", "Directory"), collect_uploads)
274
275     # Resolve any collection uuids we found to portable data hashes
276     # and assign them to uuid_map
277     uuid_map = {}
278     fetch_uuids = list(uuids.keys())
279     while fetch_uuids:
280         # For a large number of fetch_uuids, API server may limit
281         # response size, so keep fetching from API server has nothing
282         # more to give us.
283         lookups = arvrunner.api.collections().list(
284             filters=[["uuid", "in", fetch_uuids]],
285             count="none",
286             select=["uuid", "portable_data_hash"]).execute(
287                 num_retries=arvrunner.num_retries)
288
289         if not lookups["items"]:
290             break
291
292         for l in lookups["items"]:
293             uuid_map[l["uuid"]] = l["portable_data_hash"]
294
295         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
296
297     normalizeFilesDirs(sc)
298
299     if include_primary and "id" in workflowobj:
300         sc.append({"class": "File", "location": workflowobj["id"]})
301
302     if "$schemas" in workflowobj:
303         for s in workflowobj["$schemas"]:
304             sc.append({"class": "File", "location": s})
305
306     def visit_default(obj):
307         remove = [False]
308         def ensure_default_location(f):
309             if "location" not in f and "path" in f:
310                 f["location"] = f["path"]
311                 del f["path"]
312             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
313                 # Doesn't exist, remove from list of dependencies to upload
314                 sc[:] = [x for x in sc if x["location"] != f["location"]]
315                 # Delete "default" from workflowobj
316                 remove[0] = True
317         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
318         if remove[0]:
319             del obj["default"]
320
321     find_defaults(workflowobj, visit_default)
322
323     discovered = {}
324     def discover_default_secondary_files(obj):
325         builder_job_order = {}
326         for t in obj["inputs"]:
327             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
328         # Need to create a builder object to evaluate expressions.
329         builder = make_builder(builder_job_order,
330                                obj.get("hints", []),
331                                obj.get("requirements", []),
332                                ArvRuntimeContext())
333         discover_secondary_files(arvrunner.fs_access,
334                                  builder,
335                                  obj["inputs"],
336                                  builder_job_order,
337                                  discovered)
338
339     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
340
341     for d in list(discovered):
342         # Only interested in discovered secondaryFiles which are local
343         # files that need to be uploaded.
344         if d.startswith("file:"):
345             sc.extend(discovered[d])
346         else:
347             del discovered[d]
348
349     mapper = ArvPathMapper(arvrunner, sc, "",
350                            "keep:%s",
351                            "keep:%s/%s",
352                            name=name,
353                            single_collection=True)
354
355     def setloc(p):
356         loc = p.get("location")
357         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
358             p["location"] = mapper.mapper(p["location"]).resolved
359             return
360
361         if not loc:
362             return
363
364         if collectionUUID in p:
365             uuid = p[collectionUUID]
366             if uuid not in uuid_map:
367                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
368                     "Collection uuid %s not found" % uuid)
369             gp = collection_pdh_pattern.match(loc)
370             if gp and uuid_map[uuid] != gp.groups()[0]:
371                 # This file entry has both collectionUUID and a PDH
372                 # location. If the PDH doesn't match the one returned
373                 # the API server, raise an error.
374                 raise SourceLine(p, "location", validate.ValidationException).makeError(
375                     "Expected collection uuid %s to be %s but API server reported %s" % (
376                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
377
378         gp = collection_uuid_pattern.match(loc)
379         if not gp:
380             return
381         uuid = gp.groups()[0]
382         if uuid not in uuid_map:
383             raise SourceLine(p, "location", validate.ValidationException).makeError(
384                 "Collection uuid %s not found" % uuid)
385         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
386         p[collectionUUID] = uuid
387
388     visit_class(workflowobj, ("File", "Directory"), setloc)
389     visit_class(discovered, ("File", "Directory"), setloc)
390
391     if discovered_secondaryfiles is not None:
392         for d in discovered:
393             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
394
395     if "$schemas" in workflowobj:
396         sch = []
397         for s in workflowobj["$schemas"]:
398             sch.append(mapper.mapper(s).resolved)
399         workflowobj["$schemas"] = sch
400
401     return mapper
402
403
404 def upload_docker(arvrunner, tool):
405     """Uploads Docker images used in CommandLineTool objects."""
406
407     if isinstance(tool, CommandLineTool):
408         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
409         if docker_req:
410             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
411                 # TODO: can be supported by containers API, but not jobs API.
412                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
413                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
414             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
415         else:
416             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
417     elif isinstance(tool, cwltool.workflow.Workflow):
418         for s in tool.steps:
419             upload_docker(arvrunner, s.embedded_tool)
420
421
422 def packed_workflow(arvrunner, tool, merged_map):
423     """Create a packed workflow.
424
425     A "packed" workflow is one where all the components have been combined into a single document."""
426
427     rewrites = {}
428     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
429                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
430
431     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
432
433     def visit(v, cur_id):
434         if isinstance(v, dict):
435             if v.get("class") in ("CommandLineTool", "Workflow"):
436                 if "id" not in v:
437                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
438                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
439             if "location" in v and not v["location"].startswith("keep:"):
440                 v["location"] = merged_map[cur_id].resolved[v["location"]]
441             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
442                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
443             if v.get("class") == "DockerRequirement":
444                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
445             for l in v:
446                 visit(v[l], cur_id)
447         if isinstance(v, list):
448             for l in v:
449                 visit(l, cur_id)
450     visit(packed, None)
451     return packed
452
453
454 def tag_git_version(packed):
455     if tool.tool["id"].startswith("file://"):
456         path = os.path.dirname(tool.tool["id"][7:])
457         try:
458             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
459         except (OSError, subprocess.CalledProcessError):
460             pass
461         else:
462             packed["http://schema.org/version"] = githash
463
464
465 def upload_job_order(arvrunner, name, tool, job_order):
466     """Upload local files referenced in the input object and return updated input
467     object with 'location' updated to the proper keep references.
468     """
469
470     # Make a copy of the job order and set defaults.
471     builder_job_order = copy.copy(job_order)
472     fill_in_defaults(tool.tool["inputs"],
473                      builder_job_order,
474                      arvrunner.fs_access)
475     # Need to create a builder object to evaluate expressions.
476     builder = make_builder(builder_job_order,
477                            tool.hints,
478                            tool.requirements,
479                            ArvRuntimeContext())
480     # Now update job_order with secondaryFiles
481     discover_secondary_files(arvrunner.fs_access,
482                              builder,
483                              tool.tool["inputs"],
484                              job_order)
485
486     jobmapper = upload_dependencies(arvrunner,
487                                     name,
488                                     tool.doc_loader,
489                                     job_order,
490                                     job_order.get("id", "#"),
491                                     False)
492
493     if "id" in job_order:
494         del job_order["id"]
495
496     # Need to filter this out, gets added by cwltool when providing
497     # parameters on the command line.
498     if "job_order" in job_order:
499         del job_order["job_order"]
500
501     return job_order
502
503 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
504
505 def upload_workflow_deps(arvrunner, tool):
506     # Ensure that Docker images needed by this workflow are available
507
508     upload_docker(arvrunner, tool)
509
510     document_loader = tool.doc_loader
511
512     merged_map = {}
513
514     def upload_tool_deps(deptool):
515         if "id" in deptool:
516             discovered_secondaryfiles = {}
517             pm = upload_dependencies(arvrunner,
518                                      "%s dependencies" % (shortname(deptool["id"])),
519                                      document_loader,
520                                      deptool,
521                                      deptool["id"],
522                                      False,
523                                      include_primary=False,
524                                      discovered_secondaryfiles=discovered_secondaryfiles)
525             document_loader.idx[deptool["id"]] = deptool
526             toolmap = {}
527             for k,v in pm.items():
528                 toolmap[k] = v.resolved
529             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
530
531     tool.visit(upload_tool_deps)
532
533     return merged_map
534
535 def arvados_jobs_image(arvrunner, img):
536     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
537
538     try:
539         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
540     except Exception as e:
541         raise Exception("Docker image %s is not available\n%s" % (img, e) )
542
543
544 def upload_workflow_collection(arvrunner, name, packed):
545     collection = arvados.collection.Collection(api_client=arvrunner.api,
546                                                keep_client=arvrunner.keep_client,
547                                                num_retries=arvrunner.num_retries)
548     with collection.open("workflow.cwl", "w") as f:
549         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
550
551     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
552                ["name", "like", name+"%"]]
553     if arvrunner.project_uuid:
554         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
555     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
556
557     if exists["items"]:
558         logger.info("Using collection %s", exists["items"][0]["uuid"])
559     else:
560         collection.save_new(name=name,
561                             owner_uuid=arvrunner.project_uuid,
562                             ensure_unique_name=True,
563                             num_retries=arvrunner.num_retries)
564         logger.info("Uploaded to %s", collection.manifest_locator())
565
566     return collection.portable_data_hash()
567
568
569 class Runner(Process):
570     """Base class for runner processes, which submit an instance of
571     arvados-cwl-runner and wait for the final result."""
572
573     def __init__(self, runner, tool, loadingContext, enable_reuse,
574                  output_name, output_tags, submit_runner_ram=0,
575                  name=None, on_error=None, submit_runner_image=None,
576                  intermediate_output_ttl=0, merged_map=None,
577                  priority=None, secret_store=None,
578                  collection_cache_size=256,
579                  collection_cache_is_default=True):
580
581         loadingContext = loadingContext.copy()
582         loadingContext.metadata = loadingContext.metadata.copy()
583         loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
584
585         super(Runner, self).__init__(tool.tool, loadingContext)
586
587         self.arvrunner = runner
588         self.embedded_tool = tool
589         self.job_order = None
590         self.running = False
591         if enable_reuse:
592             # If reuse is permitted by command line arguments but
593             # disabled by the workflow itself, disable it.
594             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
595             if reuse_req:
596                 enable_reuse = reuse_req["enableReuse"]
597         self.enable_reuse = enable_reuse
598         self.uuid = None
599         self.final_output = None
600         self.output_name = output_name
601         self.output_tags = output_tags
602         self.name = name
603         self.on_error = on_error
604         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
605         self.intermediate_output_ttl = intermediate_output_ttl
606         self.priority = priority
607         self.secret_store = secret_store
608         self.enable_dev = loadingContext.enable_dev
609
610         self.submit_runner_cores = 1
611         self.submit_runner_ram = 1024  # defaut 1 GiB
612         self.collection_cache_size = collection_cache_size
613
614         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
615         if runner_resource_req:
616             if runner_resource_req.get("coresMin"):
617                 self.submit_runner_cores = runner_resource_req["coresMin"]
618             if runner_resource_req.get("ramMin"):
619                 self.submit_runner_ram = runner_resource_req["ramMin"]
620             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
621                 self.collection_cache_size = runner_resource_req["keep_cache"]
622
623         if submit_runner_ram:
624             # Command line / initializer overrides default and/or spec from workflow
625             self.submit_runner_ram = submit_runner_ram
626
627         if self.submit_runner_ram <= 0:
628             raise Exception("Value of submit-runner-ram must be greater than zero")
629
630         if self.submit_runner_cores <= 0:
631             raise Exception("Value of submit-runner-cores must be greater than zero")
632
633         self.merged_map = merged_map or {}
634
635     def job(self,
636             job_order,         # type: Mapping[Text, Text]
637             output_callbacks,  # type: Callable[[Any, Any], Any]
638             runtimeContext     # type: RuntimeContext
639            ):  # type: (...) -> Generator[Any, None, None]
640         self.job_order = job_order
641         self._init_job(job_order, runtimeContext)
642         yield self
643
644     def update_pipeline_component(self, record):
645         pass
646
647     def done(self, record):
648         """Base method for handling a completed runner."""
649
650         try:
651             if record["state"] == "Complete":
652                 if record.get("exit_code") is not None:
653                     if record["exit_code"] == 33:
654                         processStatus = "UnsupportedRequirement"
655                     elif record["exit_code"] == 0:
656                         processStatus = "success"
657                     else:
658                         processStatus = "permanentFail"
659                 else:
660                     processStatus = "success"
661             else:
662                 processStatus = "permanentFail"
663
664             outputs = {}
665
666             if processStatus == "permanentFail":
667                 logc = arvados.collection.CollectionReader(record["log"],
668                                                            api_client=self.arvrunner.api,
669                                                            keep_client=self.arvrunner.keep_client,
670                                                            num_retries=self.arvrunner.num_retries)
671                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
672
673             self.final_output = record["output"]
674             outc = arvados.collection.CollectionReader(self.final_output,
675                                                        api_client=self.arvrunner.api,
676                                                        keep_client=self.arvrunner.keep_client,
677                                                        num_retries=self.arvrunner.num_retries)
678             if "cwl.output.json" in outc:
679                 with outc.open("cwl.output.json", "rb") as f:
680                     if f.size() > 0:
681                         outputs = json.loads(f.read().decode())
682             def keepify(fileobj):
683                 path = fileobj["location"]
684                 if not path.startswith("keep:"):
685                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
686             adjustFileObjs(outputs, keepify)
687             adjustDirObjs(outputs, keepify)
688         except Exception:
689             logger.exception("[%s] While getting final output object", self.name)
690             self.arvrunner.output_callback({}, "permanentFail")
691         else:
692             self.arvrunner.output_callback(outputs, processStatus)