1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if inputschema == "File":
132 inputschema = {"type": "File"}
134 if isinstance(inputschema, basestring):
135 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
141 if "secondaryFiles" in inputschema:
142 # set secondaryFiles, may be inherited by compound types.
143 secondaryspec = inputschema["secondaryFiles"]
145 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
146 not isinstance(inputschema["type"], basestring)):
147 # compound type (union, array, record)
148 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
150 elif (inputschema["type"] == "record" and
151 isinstance(primary, Mapping)):
153 # record type, find secondary files associated with fields.
155 for f in inputschema["fields"]:
156 p = primary.get(shortname(f["name"]))
158 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
160 elif (inputschema["type"] == "array" and
161 isinstance(primary, Sequence)):
163 # array type, find secondary files of elements
166 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
168 elif (inputschema["type"] == "File" and
170 isinstance(primary, Mapping) and
171 primary.get("class") == "File" and
172 "secondaryFiles" not in primary):
174 # Found a file, check for secondaryFiles
177 primary["secondaryFiles"] = secondaryspec
178 for i, sf in enumerate(aslist(secondaryspec)):
179 if builder.cwlVersion == "v1.0":
182 pattern = sf["pattern"]
185 if isinstance(pattern, list):
186 specs.extend(pattern)
187 elif isinstance(pattern, dict):
188 specs.append(pattern)
189 elif isinstance(pattern, str):
190 if builder.cwlVersion == "v1.0":
191 specs.append({"pattern": pattern, "required": True})
193 specs.append({"pattern": pattern, "required": sf.get("required")})
195 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
196 "Expression must return list, object, string or null")
199 for i, sf in enumerate(specs):
200 if isinstance(sf, dict):
201 if sf.get("class") == "File":
203 if sf.get("location") is None:
204 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
205 "File object is missing 'location': %s" % sf)
206 sfpath = sf["location"]
209 pattern = sf["pattern"]
210 required = sf.get("required")
211 elif isinstance(sf, str):
215 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
216 "Expression must return list, object, string or null")
218 if pattern is not None:
219 if "${" in pattern or "$(" in pattern:
220 sfname = builder.do_eval(pattern, context=primary)
222 sfname = substitute(primary["basename"], pattern)
227 p_location = primary["location"]
228 if "/" in p_location:
230 p_location[0 : p_location.rindex("/") + 1]
234 required = builder.do_eval(required, context=primary)
236 if fsaccess.exists(sfpath):
237 if pattern is not None:
238 found.append({"location": sfpath, "class": "File"})
242 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
243 "Required secondary file '%s' does not exist" % sfpath)
245 primary["secondaryFiles"] = cmap(found)
246 if discovered is not None:
247 discovered[primary["location"]] = primary["secondaryFiles"]
248 elif inputschema["type"] not in primitive_types_set:
249 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
251 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
252 for inputschema in inputs:
253 primary = job_order.get(shortname(inputschema["id"]))
254 if isinstance(primary, (Mapping, Sequence)):
255 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
257 def upload_dependencies(arvrunner, name, document_loader,
258 workflowobj, uri, loadref_run,
259 include_primary=True, discovered_secondaryfiles=None):
260 """Upload the dependencies of the workflowobj document to Keep.
262 Returns a pathmapper object mapping local paths to keep references. Also
263 does an in-place update of references in "workflowobj".
265 Use scandeps to find $import, $include, $schemas, run, File and Directory
266 fields that represent external references.
268 If workflowobj has an "id" field, this will reload the document to ensure
269 it is scanning the raw document prior to preprocessing.
274 joined = document_loader.fetcher.urljoin(b, u)
275 defrg, _ = urllib.parse.urldefrag(joined)
276 if defrg not in loaded:
278 # Use fetch_text to get raw file (before preprocessing).
279 text = document_loader.fetch_text(defrg)
280 if isinstance(text, bytes):
281 textIO = StringIO(text.decode('utf-8'))
283 textIO = StringIO(text)
284 yamlloader = YAML(typ='safe', pure=True)
285 return yamlloader.load(textIO)
290 loadref_fields = set(("$import", "run"))
292 loadref_fields = set(("$import",))
294 scanobj = workflowobj
295 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
296 # Need raw file content (before preprocessing) to ensure
297 # that external references in $include and $mixin are captured.
298 scanobj = loadref("", workflowobj["id"])
302 sc_result = scandeps(uri, scanobj,
304 set(("$include", "location")),
305 loadref, urljoin=document_loader.fetcher.urljoin,
308 optional_deps = scandeps(uri, scanobj,
311 loadref, urljoin=document_loader.fetcher.urljoin,
314 sc_result.extend(optional_deps)
319 def collect_uuids(obj):
320 loc = obj.get("location", "")
323 # Collect collection uuids that need to be resolved to
324 # portable data hashes
325 gp = collection_uuid_pattern.match(loc)
327 uuids[gp.groups()[0]] = obj
328 if collectionUUID in obj:
329 uuids[obj[collectionUUID]] = obj
331 def collect_uploads(obj):
332 loc = obj.get("location", "")
336 if sp[0] in ("file", "http", "https"):
337 # Record local files than need to be uploaded,
338 # don't include file literals, keep references, etc.
342 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
343 visit_class(sc_result, ("File", "Directory"), collect_uploads)
345 # Resolve any collection uuids we found to portable data hashes
346 # and assign them to uuid_map
348 fetch_uuids = list(uuids.keys())
350 # For a large number of fetch_uuids, API server may limit
351 # response size, so keep fetching from API server has nothing
353 lookups = arvrunner.api.collections().list(
354 filters=[["uuid", "in", fetch_uuids]],
356 select=["uuid", "portable_data_hash"]).execute(
357 num_retries=arvrunner.num_retries)
359 if not lookups["items"]:
362 for l in lookups["items"]:
363 uuid_map[l["uuid"]] = l["portable_data_hash"]
365 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
367 normalizeFilesDirs(sc)
369 if include_primary and "id" in workflowobj:
370 sc.append({"class": "File", "location": workflowobj["id"]})
372 def visit_default(obj):
373 def defaults_are_optional(f):
374 if "location" not in f and "path" in f:
375 f["location"] = f["path"]
377 normalizeFilesDirs(f)
378 optional_deps.append(f)
379 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
381 find_defaults(workflowobj, visit_default)
384 def discover_default_secondary_files(obj):
385 builder_job_order = {}
386 for t in obj["inputs"]:
387 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
388 # Need to create a builder object to evaluate expressions.
389 builder = make_builder(builder_job_order,
390 obj.get("hints", []),
391 obj.get("requirements", []),
394 discover_secondary_files(arvrunner.fs_access,
400 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
401 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
403 for d in list(discovered):
404 # Only interested in discovered secondaryFiles which are local
405 # files that need to be uploaded.
406 if d.startswith("file:"):
407 sc.extend(discovered[d])
411 mapper = ArvPathMapper(arvrunner, sc, "",
415 single_collection=True,
416 optional_deps=optional_deps)
419 loc = p.get("location")
420 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
421 p["location"] = mapper.mapper(p["location"]).resolved
427 if collectionUUID in p:
428 uuid = p[collectionUUID]
429 if uuid not in uuid_map:
430 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
431 "Collection uuid %s not found" % uuid)
432 gp = collection_pdh_pattern.match(loc)
433 if gp and uuid_map[uuid] != gp.groups()[0]:
434 # This file entry has both collectionUUID and a PDH
435 # location. If the PDH doesn't match the one returned
436 # the API server, raise an error.
437 raise SourceLine(p, "location", validate.ValidationException).makeError(
438 "Expected collection uuid %s to be %s but API server reported %s" % (
439 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
441 gp = collection_uuid_pattern.match(loc)
444 uuid = gp.groups()[0]
445 if uuid not in uuid_map:
446 raise SourceLine(p, "location", validate.ValidationException).makeError(
447 "Collection uuid %s not found" % uuid)
448 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
449 p[collectionUUID] = uuid
451 visit_class(workflowobj, ("File", "Directory"), setloc)
452 visit_class(discovered, ("File", "Directory"), setloc)
454 if discovered_secondaryfiles is not None:
456 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
458 if "$schemas" in workflowobj:
460 for s in workflowobj["$schemas"]:
462 sch.append(mapper.mapper(s).resolved)
463 workflowobj["$schemas"] = sch
468 def upload_docker(arvrunner, tool):
469 """Uploads Docker images used in CommandLineTool objects."""
471 if isinstance(tool, CommandLineTool):
472 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
474 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
475 # TODO: can be supported by containers API, but not jobs API.
476 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
477 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
478 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
479 arvrunner.runtimeContext.force_docker_pull,
480 arvrunner.runtimeContext.tmp_outdir_prefix,
481 arvrunner.runtimeContext.match_local_docker)
483 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
484 True, arvrunner.project_uuid,
485 arvrunner.runtimeContext.force_docker_pull,
486 arvrunner.runtimeContext.tmp_outdir_prefix,
487 arvrunner.runtimeContext.match_local_docker)
488 elif isinstance(tool, cwltool.workflow.Workflow):
490 upload_docker(arvrunner, s.embedded_tool)
493 def packed_workflow(arvrunner, tool, merged_map):
494 """Create a packed workflow.
496 A "packed" workflow is one where all the components have been combined into a single document."""
499 packed = pack(arvrunner.loadingContext, tool.tool["id"],
500 rewrite_out=rewrites,
501 loader=tool.doc_loader)
503 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
505 def visit(v, cur_id):
506 if isinstance(v, dict):
507 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
508 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
509 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
511 cur_id = rewrite_to_orig.get(v["id"], v["id"])
512 if "path" in v and "location" not in v:
513 v["location"] = v["path"]
515 if "location" in v and cur_id in merged_map:
516 if v["location"] in merged_map[cur_id].resolved:
517 v["location"] = merged_map[cur_id].resolved[v["location"]]
518 if v["location"] in merged_map[cur_id].secondaryFiles:
519 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
520 if v.get("class") == "DockerRequirement":
521 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
522 arvrunner.project_uuid,
523 arvrunner.runtimeContext.force_docker_pull,
524 arvrunner.runtimeContext.tmp_outdir_prefix,
525 arvrunner.runtimeContext.match_local_docker)
528 if isinstance(v, list):
535 def tag_git_version(packed):
536 if tool.tool["id"].startswith("file://"):
537 path = os.path.dirname(tool.tool["id"][7:])
539 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
540 except (OSError, subprocess.CalledProcessError):
543 packed["http://schema.org/version"] = githash
546 def upload_job_order(arvrunner, name, tool, job_order):
547 """Upload local files referenced in the input object and return updated input
548 object with 'location' updated to the proper keep references.
551 # Make a copy of the job order and set defaults.
552 builder_job_order = copy.copy(job_order)
554 # fill_in_defaults throws an error if there are any
555 # missing required parameters, we don't want it to do that
556 # so make them all optional.
557 inputs_copy = copy.deepcopy(tool.tool["inputs"])
558 for i in inputs_copy:
559 if "null" not in i["type"]:
560 i["type"] = ["null"] + aslist(i["type"])
562 fill_in_defaults(inputs_copy,
565 # Need to create a builder object to evaluate expressions.
566 builder = make_builder(builder_job_order,
571 # Now update job_order with secondaryFiles
572 discover_secondary_files(arvrunner.fs_access,
577 jobmapper = upload_dependencies(arvrunner,
581 job_order.get("id", "#"),
584 if "id" in job_order:
587 # Need to filter this out, gets added by cwltool when providing
588 # parameters on the command line.
589 if "job_order" in job_order:
590 del job_order["job_order"]
594 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
596 def upload_workflow_deps(arvrunner, tool):
597 # Ensure that Docker images needed by this workflow are available
599 upload_docker(arvrunner, tool)
601 document_loader = tool.doc_loader
605 def upload_tool_deps(deptool):
607 discovered_secondaryfiles = {}
608 pm = upload_dependencies(arvrunner,
609 "%s dependencies" % (shortname(deptool["id"])),
614 include_primary=False,
615 discovered_secondaryfiles=discovered_secondaryfiles)
616 document_loader.idx[deptool["id"]] = deptool
618 for k,v in pm.items():
619 toolmap[k] = v.resolved
620 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
622 tool.visit(upload_tool_deps)
626 def arvados_jobs_image(arvrunner, img):
627 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
630 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
631 arvrunner.runtimeContext.force_docker_pull,
632 arvrunner.runtimeContext.tmp_outdir_prefix,
633 arvrunner.runtimeContext.match_local_docker)
634 except Exception as e:
635 raise Exception("Docker image %s is not available\n%s" % (img, e) )
638 def upload_workflow_collection(arvrunner, name, packed):
639 collection = arvados.collection.Collection(api_client=arvrunner.api,
640 keep_client=arvrunner.keep_client,
641 num_retries=arvrunner.num_retries)
642 with collection.open("workflow.cwl", "w") as f:
643 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
645 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
646 ["name", "like", name+"%"]]
647 if arvrunner.project_uuid:
648 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
649 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
652 logger.info("Using collection %s", exists["items"][0]["uuid"])
654 collection.save_new(name=name,
655 owner_uuid=arvrunner.project_uuid,
656 ensure_unique_name=True,
657 num_retries=arvrunner.num_retries)
658 logger.info("Uploaded to %s", collection.manifest_locator())
660 return collection.portable_data_hash()
663 class Runner(Process):
664 """Base class for runner processes, which submit an instance of
665 arvados-cwl-runner and wait for the final result."""
667 def __init__(self, runner, updated_tool,
668 tool, loadingContext, enable_reuse,
669 output_name, output_tags, submit_runner_ram=0,
670 name=None, on_error=None, submit_runner_image=None,
671 intermediate_output_ttl=0, merged_map=None,
672 priority=None, secret_store=None,
673 collection_cache_size=256,
674 collection_cache_is_default=True):
676 loadingContext = loadingContext.copy()
677 loadingContext.metadata = updated_tool.metadata.copy()
679 super(Runner, self).__init__(updated_tool.tool, loadingContext)
681 self.arvrunner = runner
682 self.embedded_tool = tool
683 self.job_order = None
686 # If reuse is permitted by command line arguments but
687 # disabled by the workflow itself, disable it.
688 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
690 enable_reuse = reuse_req["enableReuse"]
691 self.enable_reuse = enable_reuse
693 self.final_output = None
694 self.output_name = output_name
695 self.output_tags = output_tags
697 self.on_error = on_error
698 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
699 self.intermediate_output_ttl = intermediate_output_ttl
700 self.priority = priority
701 self.secret_store = secret_store
702 self.enable_dev = loadingContext.enable_dev
704 self.submit_runner_cores = 1
705 self.submit_runner_ram = 1024 # defaut 1 GiB
706 self.collection_cache_size = collection_cache_size
708 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
709 if runner_resource_req:
710 if runner_resource_req.get("coresMin"):
711 self.submit_runner_cores = runner_resource_req["coresMin"]
712 if runner_resource_req.get("ramMin"):
713 self.submit_runner_ram = runner_resource_req["ramMin"]
714 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
715 self.collection_cache_size = runner_resource_req["keep_cache"]
717 if submit_runner_ram:
718 # Command line / initializer overrides default and/or spec from workflow
719 self.submit_runner_ram = submit_runner_ram
721 if self.submit_runner_ram <= 0:
722 raise Exception("Value of submit-runner-ram must be greater than zero")
724 if self.submit_runner_cores <= 0:
725 raise Exception("Value of submit-runner-cores must be greater than zero")
727 self.merged_map = merged_map or {}
730 job_order, # type: Mapping[Text, Text]
731 output_callbacks, # type: Callable[[Any, Any], Any]
732 runtimeContext # type: RuntimeContext
733 ): # type: (...) -> Generator[Any, None, None]
734 self.job_order = job_order
735 self._init_job(job_order, runtimeContext)
738 def update_pipeline_component(self, record):
741 def done(self, record):
742 """Base method for handling a completed runner."""
745 if record["state"] == "Complete":
746 if record.get("exit_code") is not None:
747 if record["exit_code"] == 33:
748 processStatus = "UnsupportedRequirement"
749 elif record["exit_code"] == 0:
750 processStatus = "success"
752 processStatus = "permanentFail"
754 processStatus = "success"
756 processStatus = "permanentFail"
760 if processStatus == "permanentFail":
761 logc = arvados.collection.CollectionReader(record["log"],
762 api_client=self.arvrunner.api,
763 keep_client=self.arvrunner.keep_client,
764 num_retries=self.arvrunner.num_retries)
765 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
767 self.final_output = record["output"]
768 outc = arvados.collection.CollectionReader(self.final_output,
769 api_client=self.arvrunner.api,
770 keep_client=self.arvrunner.keep_client,
771 num_retries=self.arvrunner.num_retries)
772 if "cwl.output.json" in outc:
773 with outc.open("cwl.output.json", "rb") as f:
775 outputs = json.loads(f.read().decode())
776 def keepify(fileobj):
777 path = fileobj["location"]
778 if not path.startswith("keep:"):
779 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
780 adjustFileObjs(outputs, keepify)
781 adjustDirObjs(outputs, keepify)
783 logger.exception("[%s] While getting final output object", self.name)
784 self.arvrunner.output_callback({}, "permanentFail")
786 self.arvrunner.output_callback(outputs, processStatus)