1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if isinstance(inputschema, basestring):
132 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
138 if "secondaryFiles" in inputschema:
139 # set secondaryFiles, may be inherited by compound types.
140 secondaryspec = inputschema["secondaryFiles"]
142 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143 not isinstance(inputschema["type"], basestring)):
144 # compound type (union, array, record)
145 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147 elif (inputschema["type"] == "record" and
148 isinstance(primary, Mapping)):
150 # record type, find secondary files associated with fields.
152 for f in inputschema["fields"]:
153 p = primary.get(shortname(f["name"]))
155 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157 elif (inputschema["type"] == "array" and
158 isinstance(primary, Sequence)):
160 # array type, find secondary files of elements
163 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165 elif (inputschema["type"] == "File" and
167 isinstance(primary, Mapping) and
168 primary.get("class") == "File" and
169 "secondaryFiles" not in primary):
171 # Found a file, check for secondaryFiles
174 primary["secondaryFiles"] = secondaryspec
175 for i, sf in enumerate(aslist(secondaryspec)):
176 if builder.cwlVersion == "v1.0":
177 pattern = builder.do_eval(sf, context=primary)
179 pattern = builder.do_eval(sf["pattern"], context=primary)
182 if isinstance(pattern, list):
183 specs.extend(pattern)
184 elif isinstance(pattern, dict):
185 specs.append(pattern)
186 elif isinstance(pattern, str):
187 if builder.cwlVersion == "v1.0":
188 specs.append({"pattern": pattern, "required": True})
190 specs.append({"pattern": pattern, "required": sf.get("required")})
192 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
193 "Expression must return list, object, string or null")
196 for i, sf in enumerate(specs):
197 if isinstance(sf, dict):
198 if sf.get("class") == "File":
200 if sf.get("location") is None:
201 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202 "File object is missing 'location': %s" % sf)
203 sfpath = sf["location"]
206 pattern = sf["pattern"]
207 required = sf.get("required")
208 elif isinstance(sf, str):
212 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213 "Expression must return list, object, string or null")
215 if pattern is not None:
216 sfpath = substitute(primary["location"], pattern)
218 required = builder.do_eval(required, context=primary)
220 if fsaccess.exists(sfpath):
221 if pattern is not None:
222 found.append({"location": sfpath, "class": "File"})
226 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
227 "Required secondary file '%s' does not exist" % sfpath)
229 primary["secondaryFiles"] = cmap(found)
230 if discovered is not None:
231 discovered[primary["location"]] = primary["secondaryFiles"]
232 elif inputschema["type"] not in primitive_types_set:
233 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
235 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
236 for inputschema in inputs:
237 primary = job_order.get(shortname(inputschema["id"]))
238 if isinstance(primary, (Mapping, Sequence)):
239 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
241 def upload_dependencies(arvrunner, name, document_loader,
242 workflowobj, uri, loadref_run,
243 include_primary=True, discovered_secondaryfiles=None):
244 """Upload the dependencies of the workflowobj document to Keep.
246 Returns a pathmapper object mapping local paths to keep references. Also
247 does an in-place update of references in "workflowobj".
249 Use scandeps to find $import, $include, $schemas, run, File and Directory
250 fields that represent external references.
252 If workflowobj has an "id" field, this will reload the document to ensure
253 it is scanning the raw document prior to preprocessing.
258 joined = document_loader.fetcher.urljoin(b, u)
259 defrg, _ = urllib.parse.urldefrag(joined)
260 if defrg not in loaded:
262 # Use fetch_text to get raw file (before preprocessing).
263 text = document_loader.fetch_text(defrg)
264 if isinstance(text, bytes):
265 textIO = StringIO(text.decode('utf-8'))
267 textIO = StringIO(text)
268 yamlloader = YAML(typ='safe', pure=True)
269 return yamlloader.load(textIO)
274 loadref_fields = set(("$import", "run"))
276 loadref_fields = set(("$import",))
278 scanobj = workflowobj
279 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
280 # Need raw file content (before preprocessing) to ensure
281 # that external references in $include and $mixin are captured.
282 scanobj = loadref("", workflowobj["id"])
286 sc_result = scandeps(uri, scanobj,
288 set(("$include", "location")),
289 loadref, urljoin=document_loader.fetcher.urljoin,
292 optional_deps = scandeps(uri, scanobj,
295 loadref, urljoin=document_loader.fetcher.urljoin,
298 sc_result.extend(optional_deps)
303 def collect_uuids(obj):
304 loc = obj.get("location", "")
307 # Collect collection uuids that need to be resolved to
308 # portable data hashes
309 gp = collection_uuid_pattern.match(loc)
311 uuids[gp.groups()[0]] = obj
312 if collectionUUID in obj:
313 uuids[obj[collectionUUID]] = obj
315 def collect_uploads(obj):
316 loc = obj.get("location", "")
320 if sp[0] in ("file", "http", "https"):
321 # Record local files than need to be uploaded,
322 # don't include file literals, keep references, etc.
326 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
327 visit_class(sc_result, ("File", "Directory"), collect_uploads)
329 # Resolve any collection uuids we found to portable data hashes
330 # and assign them to uuid_map
332 fetch_uuids = list(uuids.keys())
334 # For a large number of fetch_uuids, API server may limit
335 # response size, so keep fetching from API server has nothing
337 lookups = arvrunner.api.collections().list(
338 filters=[["uuid", "in", fetch_uuids]],
340 select=["uuid", "portable_data_hash"]).execute(
341 num_retries=arvrunner.num_retries)
343 if not lookups["items"]:
346 for l in lookups["items"]:
347 uuid_map[l["uuid"]] = l["portable_data_hash"]
349 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
351 normalizeFilesDirs(sc)
353 if include_primary and "id" in workflowobj:
354 sc.append({"class": "File", "location": workflowobj["id"]})
356 #if "$schemas" in workflowobj:
357 # for s in workflowobj["$schemas"]:
358 # sc.append({"class": "File", "location": s})
360 def visit_default(obj):
362 def ensure_default_location(f):
363 if "location" not in f and "path" in f:
364 f["location"] = f["path"]
366 optional_deps.append(f)
367 #if "location" in f and not arvrunner.fs_access.exists(f["location"]):
368 # # Doesn't exist, remove from list of dependencies to upload
369 # sc[:] = [x for x in sc if x["location"] != f["location"]]
370 # # Delete "default" from workflowobj
372 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
376 find_defaults(workflowobj, visit_default)
379 def discover_default_secondary_files(obj):
380 builder_job_order = {}
381 for t in obj["inputs"]:
382 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
383 # Need to create a builder object to evaluate expressions.
384 builder = make_builder(builder_job_order,
385 obj.get("hints", []),
386 obj.get("requirements", []),
389 discover_secondary_files(arvrunner.fs_access,
395 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
396 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
398 for d in list(discovered):
399 # Only interested in discovered secondaryFiles which are local
400 # files that need to be uploaded.
401 if d.startswith("file:"):
402 sc.extend(discovered[d])
406 mapper = ArvPathMapper(arvrunner, sc, "",
410 single_collection=True,
411 optional_deps=optional_deps)
414 loc = p.get("location")
415 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
416 p["location"] = mapper.mapper(p["location"]).resolved
422 if collectionUUID in p:
423 uuid = p[collectionUUID]
424 if uuid not in uuid_map:
425 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
426 "Collection uuid %s not found" % uuid)
427 gp = collection_pdh_pattern.match(loc)
428 if gp and uuid_map[uuid] != gp.groups()[0]:
429 # This file entry has both collectionUUID and a PDH
430 # location. If the PDH doesn't match the one returned
431 # the API server, raise an error.
432 raise SourceLine(p, "location", validate.ValidationException).makeError(
433 "Expected collection uuid %s to be %s but API server reported %s" % (
434 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
436 gp = collection_uuid_pattern.match(loc)
439 uuid = gp.groups()[0]
440 if uuid not in uuid_map:
441 raise SourceLine(p, "location", validate.ValidationException).makeError(
442 "Collection uuid %s not found" % uuid)
443 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
444 p[collectionUUID] = uuid
446 visit_class(workflowobj, ("File", "Directory"), setloc)
447 visit_class(discovered, ("File", "Directory"), setloc)
449 if discovered_secondaryfiles is not None:
451 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
453 if "$schemas" in workflowobj:
455 for s in workflowobj["$schemas"]:
457 sch.append(mapper.mapper(s).resolved)
458 workflowobj["$schemas"] = sch
463 def upload_docker(arvrunner, tool):
464 """Uploads Docker images used in CommandLineTool objects."""
466 if isinstance(tool, CommandLineTool):
467 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
469 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
470 # TODO: can be supported by containers API, but not jobs API.
471 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
472 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
473 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
474 arvrunner.runtimeContext.force_docker_pull,
475 arvrunner.runtimeContext.tmp_outdir_prefix,
476 arvrunner.runtimeContext.match_local_docker)
478 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
479 True, arvrunner.project_uuid,
480 arvrunner.runtimeContext.force_docker_pull,
481 arvrunner.runtimeContext.tmp_outdir_prefix,
482 arvrunner.runtimeContext.match_local_docker)
483 elif isinstance(tool, cwltool.workflow.Workflow):
485 upload_docker(arvrunner, s.embedded_tool)
488 def packed_workflow(arvrunner, tool, merged_map):
489 """Create a packed workflow.
491 A "packed" workflow is one where all the components have been combined into a single document."""
494 packed = pack(arvrunner.loadingContext, tool.tool["id"],
495 rewrite_out=rewrites,
496 loader=tool.doc_loader)
498 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
500 def visit(v, cur_id):
501 if isinstance(v, dict):
502 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
503 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
504 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
506 cur_id = rewrite_to_orig.get(v["id"], v["id"])
507 if "path" in v and "location" not in v:
508 v["location"] = v["path"]
510 if "location" in v and cur_id in merged_map:
511 if v["location"] in merged_map[cur_id].resolved:
512 v["location"] = merged_map[cur_id].resolved[v["location"]]
513 if v["location"] in merged_map[cur_id].secondaryFiles:
514 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
515 if v.get("class") == "DockerRequirement":
516 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
517 arvrunner.project_uuid,
518 arvrunner.runtimeContext.force_docker_pull,
519 arvrunner.runtimeContext.tmp_outdir_prefix,
520 arvrunner.runtimeContext.match_local_docker)
523 if isinstance(v, list):
530 def tag_git_version(packed):
531 if tool.tool["id"].startswith("file://"):
532 path = os.path.dirname(tool.tool["id"][7:])
534 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
535 except (OSError, subprocess.CalledProcessError):
538 packed["http://schema.org/version"] = githash
541 def upload_job_order(arvrunner, name, tool, job_order):
542 """Upload local files referenced in the input object and return updated input
543 object with 'location' updated to the proper keep references.
546 # Make a copy of the job order and set defaults.
547 builder_job_order = copy.copy(job_order)
549 # fill_in_defaults throws an error if there are any
550 # missing required parameters, we don't want it to do that
551 # so make them all optional.
552 inputs_copy = copy.deepcopy(tool.tool["inputs"])
553 for i in inputs_copy:
554 if "null" not in i["type"]:
555 i["type"] = ["null"] + aslist(i["type"])
557 fill_in_defaults(inputs_copy,
560 # Need to create a builder object to evaluate expressions.
561 builder = make_builder(builder_job_order,
566 # Now update job_order with secondaryFiles
567 discover_secondary_files(arvrunner.fs_access,
572 jobmapper = upload_dependencies(arvrunner,
576 job_order.get("id", "#"),
579 if "id" in job_order:
582 # Need to filter this out, gets added by cwltool when providing
583 # parameters on the command line.
584 if "job_order" in job_order:
585 del job_order["job_order"]
589 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
591 def upload_workflow_deps(arvrunner, tool):
592 # Ensure that Docker images needed by this workflow are available
594 upload_docker(arvrunner, tool)
596 document_loader = tool.doc_loader
600 def upload_tool_deps(deptool):
602 discovered_secondaryfiles = {}
603 pm = upload_dependencies(arvrunner,
604 "%s dependencies" % (shortname(deptool["id"])),
609 include_primary=False,
610 discovered_secondaryfiles=discovered_secondaryfiles)
611 document_loader.idx[deptool["id"]] = deptool
613 for k,v in pm.items():
614 toolmap[k] = v.resolved
615 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
617 tool.visit(upload_tool_deps)
621 def arvados_jobs_image(arvrunner, img):
622 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
625 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
626 arvrunner.runtimeContext.force_docker_pull,
627 arvrunner.runtimeContext.tmp_outdir_prefix,
628 arvrunner.runtimeContext.match_local_docker)
629 except Exception as e:
630 raise Exception("Docker image %s is not available\n%s" % (img, e) )
633 def upload_workflow_collection(arvrunner, name, packed):
634 collection = arvados.collection.Collection(api_client=arvrunner.api,
635 keep_client=arvrunner.keep_client,
636 num_retries=arvrunner.num_retries)
637 with collection.open("workflow.cwl", "w") as f:
638 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
640 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
641 ["name", "like", name+"%"]]
642 if arvrunner.project_uuid:
643 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
644 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
647 logger.info("Using collection %s", exists["items"][0]["uuid"])
649 collection.save_new(name=name,
650 owner_uuid=arvrunner.project_uuid,
651 ensure_unique_name=True,
652 num_retries=arvrunner.num_retries)
653 logger.info("Uploaded to %s", collection.manifest_locator())
655 return collection.portable_data_hash()
658 class Runner(Process):
659 """Base class for runner processes, which submit an instance of
660 arvados-cwl-runner and wait for the final result."""
662 def __init__(self, runner, updated_tool,
663 tool, loadingContext, enable_reuse,
664 output_name, output_tags, submit_runner_ram=0,
665 name=None, on_error=None, submit_runner_image=None,
666 intermediate_output_ttl=0, merged_map=None,
667 priority=None, secret_store=None,
668 collection_cache_size=256,
669 collection_cache_is_default=True):
671 loadingContext = loadingContext.copy()
672 loadingContext.metadata = updated_tool.metadata.copy()
674 super(Runner, self).__init__(updated_tool.tool, loadingContext)
676 self.arvrunner = runner
677 self.embedded_tool = tool
678 self.job_order = None
681 # If reuse is permitted by command line arguments but
682 # disabled by the workflow itself, disable it.
683 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
685 enable_reuse = reuse_req["enableReuse"]
686 self.enable_reuse = enable_reuse
688 self.final_output = None
689 self.output_name = output_name
690 self.output_tags = output_tags
692 self.on_error = on_error
693 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
694 self.intermediate_output_ttl = intermediate_output_ttl
695 self.priority = priority
696 self.secret_store = secret_store
697 self.enable_dev = loadingContext.enable_dev
699 self.submit_runner_cores = 1
700 self.submit_runner_ram = 1024 # defaut 1 GiB
701 self.collection_cache_size = collection_cache_size
703 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
704 if runner_resource_req:
705 if runner_resource_req.get("coresMin"):
706 self.submit_runner_cores = runner_resource_req["coresMin"]
707 if runner_resource_req.get("ramMin"):
708 self.submit_runner_ram = runner_resource_req["ramMin"]
709 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
710 self.collection_cache_size = runner_resource_req["keep_cache"]
712 if submit_runner_ram:
713 # Command line / initializer overrides default and/or spec from workflow
714 self.submit_runner_ram = submit_runner_ram
716 if self.submit_runner_ram <= 0:
717 raise Exception("Value of submit-runner-ram must be greater than zero")
719 if self.submit_runner_cores <= 0:
720 raise Exception("Value of submit-runner-cores must be greater than zero")
722 self.merged_map = merged_map or {}
725 job_order, # type: Mapping[Text, Text]
726 output_callbacks, # type: Callable[[Any, Any], Any]
727 runtimeContext # type: RuntimeContext
728 ): # type: (...) -> Generator[Any, None, None]
729 self.job_order = job_order
730 self._init_job(job_order, runtimeContext)
733 def update_pipeline_component(self, record):
736 def done(self, record):
737 """Base method for handling a completed runner."""
740 if record["state"] == "Complete":
741 if record.get("exit_code") is not None:
742 if record["exit_code"] == 33:
743 processStatus = "UnsupportedRequirement"
744 elif record["exit_code"] == 0:
745 processStatus = "success"
747 processStatus = "permanentFail"
749 processStatus = "success"
751 processStatus = "permanentFail"
755 if processStatus == "permanentFail":
756 logc = arvados.collection.CollectionReader(record["log"],
757 api_client=self.arvrunner.api,
758 keep_client=self.arvrunner.keep_client,
759 num_retries=self.arvrunner.num_retries)
760 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
762 self.final_output = record["output"]
763 outc = arvados.collection.CollectionReader(self.final_output,
764 api_client=self.arvrunner.api,
765 keep_client=self.arvrunner.keep_client,
766 num_retries=self.arvrunner.num_retries)
767 if "cwl.output.json" in outc:
768 with outc.open("cwl.output.json", "rb") as f:
770 outputs = json.loads(f.read().decode())
771 def keepify(fileobj):
772 path = fileobj["location"]
773 if not path.startswith("keep:"):
774 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
775 adjustFileObjs(outputs, keepify)
776 adjustDirObjs(outputs, keepify)
778 logger.exception("[%s] While getting final output object", self.name)
779 self.arvrunner.output_callback({}, "permanentFail")
781 self.arvrunner.output_callback(outputs, processStatus)