1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if inputschema == "File":
132 inputschema = {"type": "File"}
134 if isinstance(inputschema, basestring):
135 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
141 if "secondaryFiles" in inputschema:
142 # set secondaryFiles, may be inherited by compound types.
143 secondaryspec = inputschema["secondaryFiles"]
145 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
146 not isinstance(inputschema["type"], basestring)):
147 # compound type (union, array, record)
148 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
150 elif (inputschema["type"] == "record" and
151 isinstance(primary, Mapping)):
153 # record type, find secondary files associated with fields.
155 for f in inputschema["fields"]:
156 p = primary.get(shortname(f["name"]))
158 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
160 elif (inputschema["type"] == "array" and
161 isinstance(primary, Sequence)):
163 # array type, find secondary files of elements
166 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
168 elif (inputschema["type"] == "File" and
170 isinstance(primary, Mapping) and
171 primary.get("class") == "File" and
172 "secondaryFiles" not in primary):
174 # Found a file, check for secondaryFiles
177 primary["secondaryFiles"] = secondaryspec
178 for i, sf in enumerate(aslist(secondaryspec)):
179 if builder.cwlVersion == "v1.0":
180 pattern = builder.do_eval(sf, context=primary)
182 pattern = builder.do_eval(sf["pattern"], context=primary)
185 if isinstance(pattern, list):
186 specs.extend(pattern)
187 elif isinstance(pattern, dict):
188 specs.append(pattern)
189 elif isinstance(pattern, str):
190 if builder.cwlVersion == "v1.0":
191 specs.append({"pattern": pattern, "required": True})
193 specs.append({"pattern": pattern, "required": sf.get("required")})
195 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
196 "Expression must return list, object, string or null")
199 for i, sf in enumerate(specs):
200 if isinstance(sf, dict):
201 if sf.get("class") == "File":
203 if sf.get("location") is None:
204 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
205 "File object is missing 'location': %s" % sf)
206 sfpath = sf["location"]
209 pattern = sf["pattern"]
210 required = sf.get("required")
211 elif isinstance(sf, str):
215 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
216 "Expression must return list, object, string or null")
218 if pattern is not None:
219 sfpath = substitute(primary["location"], pattern)
221 required = builder.do_eval(required, context=primary)
223 if fsaccess.exists(sfpath):
224 if pattern is not None:
225 found.append({"location": sfpath, "class": "File"})
229 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
230 "Required secondary file '%s' does not exist" % sfpath)
232 primary["secondaryFiles"] = cmap(found)
233 if discovered is not None:
234 discovered[primary["location"]] = primary["secondaryFiles"]
235 elif inputschema["type"] not in primitive_types_set:
236 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
238 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
239 for inputschema in inputs:
240 primary = job_order.get(shortname(inputschema["id"]))
241 if isinstance(primary, (Mapping, Sequence)):
242 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
244 def upload_dependencies(arvrunner, name, document_loader,
245 workflowobj, uri, loadref_run,
246 include_primary=True, discovered_secondaryfiles=None):
247 """Upload the dependencies of the workflowobj document to Keep.
249 Returns a pathmapper object mapping local paths to keep references. Also
250 does an in-place update of references in "workflowobj".
252 Use scandeps to find $import, $include, $schemas, run, File and Directory
253 fields that represent external references.
255 If workflowobj has an "id" field, this will reload the document to ensure
256 it is scanning the raw document prior to preprocessing.
261 joined = document_loader.fetcher.urljoin(b, u)
262 defrg, _ = urllib.parse.urldefrag(joined)
263 if defrg not in loaded:
265 # Use fetch_text to get raw file (before preprocessing).
266 text = document_loader.fetch_text(defrg)
267 if isinstance(text, bytes):
268 textIO = StringIO(text.decode('utf-8'))
270 textIO = StringIO(text)
271 yamlloader = YAML(typ='safe', pure=True)
272 return yamlloader.load(textIO)
277 loadref_fields = set(("$import", "run"))
279 loadref_fields = set(("$import",))
281 scanobj = workflowobj
282 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
283 # Need raw file content (before preprocessing) to ensure
284 # that external references in $include and $mixin are captured.
285 scanobj = loadref("", workflowobj["id"])
289 sc_result = scandeps(uri, scanobj,
291 set(("$include", "location")),
292 loadref, urljoin=document_loader.fetcher.urljoin,
295 optional_deps = scandeps(uri, scanobj,
298 loadref, urljoin=document_loader.fetcher.urljoin,
301 sc_result.extend(optional_deps)
306 def collect_uuids(obj):
307 loc = obj.get("location", "")
310 # Collect collection uuids that need to be resolved to
311 # portable data hashes
312 gp = collection_uuid_pattern.match(loc)
314 uuids[gp.groups()[0]] = obj
315 if collectionUUID in obj:
316 uuids[obj[collectionUUID]] = obj
318 def collect_uploads(obj):
319 loc = obj.get("location", "")
323 if sp[0] in ("file", "http", "https"):
324 # Record local files than need to be uploaded,
325 # don't include file literals, keep references, etc.
329 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
330 visit_class(sc_result, ("File", "Directory"), collect_uploads)
332 # Resolve any collection uuids we found to portable data hashes
333 # and assign them to uuid_map
335 fetch_uuids = list(uuids.keys())
337 # For a large number of fetch_uuids, API server may limit
338 # response size, so keep fetching from API server has nothing
340 lookups = arvrunner.api.collections().list(
341 filters=[["uuid", "in", fetch_uuids]],
343 select=["uuid", "portable_data_hash"]).execute(
344 num_retries=arvrunner.num_retries)
346 if not lookups["items"]:
349 for l in lookups["items"]:
350 uuid_map[l["uuid"]] = l["portable_data_hash"]
352 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
354 normalizeFilesDirs(sc)
356 if include_primary and "id" in workflowobj:
357 sc.append({"class": "File", "location": workflowobj["id"]})
359 def visit_default(obj):
360 def defaults_are_optional(f):
361 if "location" not in f and "path" in f:
362 f["location"] = f["path"]
364 normalizeFilesDirs(f)
365 optional_deps.append(f)
366 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
368 find_defaults(workflowobj, visit_default)
371 def discover_default_secondary_files(obj):
372 builder_job_order = {}
373 for t in obj["inputs"]:
374 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
375 # Need to create a builder object to evaluate expressions.
376 builder = make_builder(builder_job_order,
377 obj.get("hints", []),
378 obj.get("requirements", []),
381 discover_secondary_files(arvrunner.fs_access,
387 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
388 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
390 for d in list(discovered):
391 # Only interested in discovered secondaryFiles which are local
392 # files that need to be uploaded.
393 if d.startswith("file:"):
394 sc.extend(discovered[d])
398 mapper = ArvPathMapper(arvrunner, sc, "",
402 single_collection=True,
403 optional_deps=optional_deps)
406 loc = p.get("location")
407 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
408 p["location"] = mapper.mapper(p["location"]).resolved
414 if collectionUUID in p:
415 uuid = p[collectionUUID]
416 if uuid not in uuid_map:
417 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
418 "Collection uuid %s not found" % uuid)
419 gp = collection_pdh_pattern.match(loc)
420 if gp and uuid_map[uuid] != gp.groups()[0]:
421 # This file entry has both collectionUUID and a PDH
422 # location. If the PDH doesn't match the one returned
423 # the API server, raise an error.
424 raise SourceLine(p, "location", validate.ValidationException).makeError(
425 "Expected collection uuid %s to be %s but API server reported %s" % (
426 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
428 gp = collection_uuid_pattern.match(loc)
431 uuid = gp.groups()[0]
432 if uuid not in uuid_map:
433 raise SourceLine(p, "location", validate.ValidationException).makeError(
434 "Collection uuid %s not found" % uuid)
435 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
436 p[collectionUUID] = uuid
438 visit_class(workflowobj, ("File", "Directory"), setloc)
439 visit_class(discovered, ("File", "Directory"), setloc)
441 if discovered_secondaryfiles is not None:
443 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
445 if "$schemas" in workflowobj:
447 for s in workflowobj["$schemas"]:
449 sch.append(mapper.mapper(s).resolved)
450 workflowobj["$schemas"] = sch
455 def upload_docker(arvrunner, tool):
456 """Uploads Docker images used in CommandLineTool objects."""
458 if isinstance(tool, CommandLineTool):
459 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
461 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
462 # TODO: can be supported by containers API, but not jobs API.
463 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
464 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
465 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
466 arvrunner.runtimeContext.force_docker_pull,
467 arvrunner.runtimeContext.tmp_outdir_prefix,
468 arvrunner.runtimeContext.match_local_docker)
470 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
471 True, arvrunner.project_uuid,
472 arvrunner.runtimeContext.force_docker_pull,
473 arvrunner.runtimeContext.tmp_outdir_prefix,
474 arvrunner.runtimeContext.match_local_docker)
475 elif isinstance(tool, cwltool.workflow.Workflow):
477 upload_docker(arvrunner, s.embedded_tool)
480 def packed_workflow(arvrunner, tool, merged_map):
481 """Create a packed workflow.
483 A "packed" workflow is one where all the components have been combined into a single document."""
486 packed = pack(arvrunner.loadingContext, tool.tool["id"],
487 rewrite_out=rewrites,
488 loader=tool.doc_loader)
490 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
492 def visit(v, cur_id):
493 if isinstance(v, dict):
494 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
495 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
496 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
498 cur_id = rewrite_to_orig.get(v["id"], v["id"])
499 if "path" in v and "location" not in v:
500 v["location"] = v["path"]
502 if "location" in v and cur_id in merged_map:
503 if v["location"] in merged_map[cur_id].resolved:
504 v["location"] = merged_map[cur_id].resolved[v["location"]]
505 if v["location"] in merged_map[cur_id].secondaryFiles:
506 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
507 if v.get("class") == "DockerRequirement":
508 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
509 arvrunner.project_uuid,
510 arvrunner.runtimeContext.force_docker_pull,
511 arvrunner.runtimeContext.tmp_outdir_prefix,
512 arvrunner.runtimeContext.match_local_docker)
515 if isinstance(v, list):
522 def tag_git_version(packed):
523 if tool.tool["id"].startswith("file://"):
524 path = os.path.dirname(tool.tool["id"][7:])
526 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
527 except (OSError, subprocess.CalledProcessError):
530 packed["http://schema.org/version"] = githash
533 def upload_job_order(arvrunner, name, tool, job_order):
534 """Upload local files referenced in the input object and return updated input
535 object with 'location' updated to the proper keep references.
538 # Make a copy of the job order and set defaults.
539 builder_job_order = copy.copy(job_order)
541 # fill_in_defaults throws an error if there are any
542 # missing required parameters, we don't want it to do that
543 # so make them all optional.
544 inputs_copy = copy.deepcopy(tool.tool["inputs"])
545 for i in inputs_copy:
546 if "null" not in i["type"]:
547 i["type"] = ["null"] + aslist(i["type"])
549 fill_in_defaults(inputs_copy,
552 # Need to create a builder object to evaluate expressions.
553 builder = make_builder(builder_job_order,
558 # Now update job_order with secondaryFiles
559 discover_secondary_files(arvrunner.fs_access,
564 jobmapper = upload_dependencies(arvrunner,
568 job_order.get("id", "#"),
571 if "id" in job_order:
574 # Need to filter this out, gets added by cwltool when providing
575 # parameters on the command line.
576 if "job_order" in job_order:
577 del job_order["job_order"]
581 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
583 def upload_workflow_deps(arvrunner, tool):
584 # Ensure that Docker images needed by this workflow are available
586 upload_docker(arvrunner, tool)
588 document_loader = tool.doc_loader
592 def upload_tool_deps(deptool):
594 discovered_secondaryfiles = {}
595 pm = upload_dependencies(arvrunner,
596 "%s dependencies" % (shortname(deptool["id"])),
601 include_primary=False,
602 discovered_secondaryfiles=discovered_secondaryfiles)
603 document_loader.idx[deptool["id"]] = deptool
605 for k,v in pm.items():
606 toolmap[k] = v.resolved
607 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
609 tool.visit(upload_tool_deps)
613 def arvados_jobs_image(arvrunner, img):
614 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
617 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
618 arvrunner.runtimeContext.force_docker_pull,
619 arvrunner.runtimeContext.tmp_outdir_prefix,
620 arvrunner.runtimeContext.match_local_docker)
621 except Exception as e:
622 raise Exception("Docker image %s is not available\n%s" % (img, e) )
625 def upload_workflow_collection(arvrunner, name, packed):
626 collection = arvados.collection.Collection(api_client=arvrunner.api,
627 keep_client=arvrunner.keep_client,
628 num_retries=arvrunner.num_retries)
629 with collection.open("workflow.cwl", "w") as f:
630 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
632 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
633 ["name", "like", name+"%"]]
634 if arvrunner.project_uuid:
635 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
636 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
639 logger.info("Using collection %s", exists["items"][0]["uuid"])
641 collection.save_new(name=name,
642 owner_uuid=arvrunner.project_uuid,
643 ensure_unique_name=True,
644 num_retries=arvrunner.num_retries)
645 logger.info("Uploaded to %s", collection.manifest_locator())
647 return collection.portable_data_hash()
650 class Runner(Process):
651 """Base class for runner processes, which submit an instance of
652 arvados-cwl-runner and wait for the final result."""
654 def __init__(self, runner, updated_tool,
655 tool, loadingContext, enable_reuse,
656 output_name, output_tags, submit_runner_ram=0,
657 name=None, on_error=None, submit_runner_image=None,
658 intermediate_output_ttl=0, merged_map=None,
659 priority=None, secret_store=None,
660 collection_cache_size=256,
661 collection_cache_is_default=True):
663 loadingContext = loadingContext.copy()
664 loadingContext.metadata = updated_tool.metadata.copy()
666 super(Runner, self).__init__(updated_tool.tool, loadingContext)
668 self.arvrunner = runner
669 self.embedded_tool = tool
670 self.job_order = None
673 # If reuse is permitted by command line arguments but
674 # disabled by the workflow itself, disable it.
675 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
677 enable_reuse = reuse_req["enableReuse"]
678 self.enable_reuse = enable_reuse
680 self.final_output = None
681 self.output_name = output_name
682 self.output_tags = output_tags
684 self.on_error = on_error
685 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
686 self.intermediate_output_ttl = intermediate_output_ttl
687 self.priority = priority
688 self.secret_store = secret_store
689 self.enable_dev = loadingContext.enable_dev
691 self.submit_runner_cores = 1
692 self.submit_runner_ram = 1024 # defaut 1 GiB
693 self.collection_cache_size = collection_cache_size
695 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
696 if runner_resource_req:
697 if runner_resource_req.get("coresMin"):
698 self.submit_runner_cores = runner_resource_req["coresMin"]
699 if runner_resource_req.get("ramMin"):
700 self.submit_runner_ram = runner_resource_req["ramMin"]
701 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
702 self.collection_cache_size = runner_resource_req["keep_cache"]
704 if submit_runner_ram:
705 # Command line / initializer overrides default and/or spec from workflow
706 self.submit_runner_ram = submit_runner_ram
708 if self.submit_runner_ram <= 0:
709 raise Exception("Value of submit-runner-ram must be greater than zero")
711 if self.submit_runner_cores <= 0:
712 raise Exception("Value of submit-runner-cores must be greater than zero")
714 self.merged_map = merged_map or {}
717 job_order, # type: Mapping[Text, Text]
718 output_callbacks, # type: Callable[[Any, Any], Any]
719 runtimeContext # type: RuntimeContext
720 ): # type: (...) -> Generator[Any, None, None]
721 self.job_order = job_order
722 self._init_job(job_order, runtimeContext)
725 def update_pipeline_component(self, record):
728 def done(self, record):
729 """Base method for handling a completed runner."""
732 if record["state"] == "Complete":
733 if record.get("exit_code") is not None:
734 if record["exit_code"] == 33:
735 processStatus = "UnsupportedRequirement"
736 elif record["exit_code"] == 0:
737 processStatus = "success"
739 processStatus = "permanentFail"
741 processStatus = "success"
743 processStatus = "permanentFail"
747 if processStatus == "permanentFail":
748 logc = arvados.collection.CollectionReader(record["log"],
749 api_client=self.arvrunner.api,
750 keep_client=self.arvrunner.keep_client,
751 num_retries=self.arvrunner.num_retries)
752 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
754 self.final_output = record["output"]
755 outc = arvados.collection.CollectionReader(self.final_output,
756 api_client=self.arvrunner.api,
757 keep_client=self.arvrunner.keep_client,
758 num_retries=self.arvrunner.num_retries)
759 if "cwl.output.json" in outc:
760 with outc.open("cwl.output.json", "rb") as f:
762 outputs = json.loads(f.read().decode())
763 def keepify(fileobj):
764 path = fileobj["location"]
765 if not path.startswith("keep:"):
766 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
767 adjustFileObjs(outputs, keepify)
768 adjustDirObjs(outputs, keepify)
770 logger.exception("[%s] While getting final output object", self.name)
771 self.arvrunner.output_callback({}, "permanentFail")
773 self.arvrunner.output_callback(outputs, processStatus)