1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if isinstance(inputschema, basestring):
132 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
138 if "secondaryFiles" in inputschema:
139 # set secondaryFiles, may be inherited by compound types.
140 secondaryspec = inputschema["secondaryFiles"]
142 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143 not isinstance(inputschema["type"], basestring)):
144 # compound type (union, array, record)
145 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147 elif (inputschema["type"] == "record" and
148 isinstance(primary, Mapping)):
150 # record type, find secondary files associated with fields.
152 for f in inputschema["fields"]:
153 p = primary.get(shortname(f["name"]))
155 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157 elif (inputschema["type"] == "array" and
158 isinstance(primary, Sequence)):
160 # array type, find secondary files of elements
163 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165 elif (inputschema["type"] == "File" and
167 isinstance(primary, Mapping) and
168 primary.get("class") == "File" and
169 "secondaryFiles" not in primary):
171 # Found a file, check for secondaryFiles
174 primary["secondaryFiles"] = secondaryspec
175 for i, sf in enumerate(aslist(secondaryspec)):
176 if builder.cwlVersion == "v1.0":
177 pattern = builder.do_eval(sf, context=primary)
179 pattern = builder.do_eval(sf["pattern"], context=primary)
182 if isinstance(pattern, list):
183 specs.extend(pattern)
184 elif isinstance(pattern, dict):
185 specs.append(pattern)
186 elif isinstance(pattern, str):
187 if builder.cwlVersion == "v1.0":
188 specs.append({"pattern": pattern, "required": True})
190 specs.append({"pattern": pattern, "required": sf.get("required")})
192 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
193 "Expression must return list, object, string or null")
196 for i, sf in enumerate(specs):
197 if isinstance(sf, dict):
198 if sf.get("class") == "File":
200 if sf.get("location") is None:
201 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202 "File object is missing 'location': %s" % sf)
203 sfpath = sf["location"]
206 pattern = sf["pattern"]
207 required = sf.get("required")
208 elif isinstance(sf, str):
212 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213 "Expression must return list, object, string or null")
215 if pattern is not None:
216 sfpath = substitute(primary["location"], pattern)
218 required = builder.do_eval(required, context=primary)
220 if fsaccess.exists(sfpath):
221 if pattern is not None:
222 found.append({"location": sfpath, "class": "File"})
226 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
227 "Required secondary file '%s' does not exist" % sfpath)
229 primary["secondaryFiles"] = cmap(found)
230 if discovered is not None:
231 discovered[primary["location"]] = primary["secondaryFiles"]
232 elif inputschema["type"] not in primitive_types_set:
233 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
235 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
236 for inputschema in inputs:
237 primary = job_order.get(shortname(inputschema["id"]))
238 if isinstance(primary, (Mapping, Sequence)):
239 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
241 def upload_dependencies(arvrunner, name, document_loader,
242 workflowobj, uri, loadref_run,
243 include_primary=True, discovered_secondaryfiles=None):
244 """Upload the dependencies of the workflowobj document to Keep.
246 Returns a pathmapper object mapping local paths to keep references. Also
247 does an in-place update of references in "workflowobj".
249 Use scandeps to find $import, $include, $schemas, run, File and Directory
250 fields that represent external references.
252 If workflowobj has an "id" field, this will reload the document to ensure
253 it is scanning the raw document prior to preprocessing.
258 joined = document_loader.fetcher.urljoin(b, u)
259 defrg, _ = urllib.parse.urldefrag(joined)
260 if defrg not in loaded:
262 # Use fetch_text to get raw file (before preprocessing).
263 text = document_loader.fetch_text(defrg)
264 if isinstance(text, bytes):
265 textIO = StringIO(text.decode('utf-8'))
267 textIO = StringIO(text)
268 yamlloader = YAML(typ='safe', pure=True)
269 return yamlloader.load(textIO)
274 loadref_fields = set(("$import", "run"))
276 loadref_fields = set(("$import",))
278 scanobj = workflowobj
279 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
280 # Need raw file content (before preprocessing) to ensure
281 # that external references in $include and $mixin are captured.
282 scanobj = loadref("", workflowobj["id"])
286 sc_result = scandeps(uri, scanobj,
288 set(("$include", "$schemas", "location")),
289 loadref, urljoin=document_loader.fetcher.urljoin,
295 def collect_uuids(obj):
296 loc = obj.get("location", "")
299 # Collect collection uuids that need to be resolved to
300 # portable data hashes
301 gp = collection_uuid_pattern.match(loc)
303 uuids[gp.groups()[0]] = obj
304 if collectionUUID in obj:
305 uuids[obj[collectionUUID]] = obj
307 def collect_uploads(obj):
308 loc = obj.get("location", "")
312 if sp[0] in ("file", "http", "https"):
313 # Record local files than need to be uploaded,
314 # don't include file literals, keep references, etc.
318 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
319 visit_class(sc_result, ("File", "Directory"), collect_uploads)
321 # Resolve any collection uuids we found to portable data hashes
322 # and assign them to uuid_map
324 fetch_uuids = list(uuids.keys())
326 # For a large number of fetch_uuids, API server may limit
327 # response size, so keep fetching from API server has nothing
329 lookups = arvrunner.api.collections().list(
330 filters=[["uuid", "in", fetch_uuids]],
332 select=["uuid", "portable_data_hash"]).execute(
333 num_retries=arvrunner.num_retries)
335 if not lookups["items"]:
338 for l in lookups["items"]:
339 uuid_map[l["uuid"]] = l["portable_data_hash"]
341 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
343 normalizeFilesDirs(sc)
345 if include_primary and "id" in workflowobj:
346 sc.append({"class": "File", "location": workflowobj["id"]})
348 if "$schemas" in workflowobj:
349 for s in workflowobj["$schemas"]:
350 sc.append({"class": "File", "location": s})
352 def visit_default(obj):
354 def ensure_default_location(f):
355 if "location" not in f and "path" in f:
356 f["location"] = f["path"]
358 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
359 # Doesn't exist, remove from list of dependencies to upload
360 sc[:] = [x for x in sc if x["location"] != f["location"]]
361 # Delete "default" from workflowobj
363 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
367 find_defaults(workflowobj, visit_default)
370 def discover_default_secondary_files(obj):
371 builder_job_order = {}
372 for t in obj["inputs"]:
373 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
374 # Need to create a builder object to evaluate expressions.
375 builder = make_builder(builder_job_order,
376 obj.get("hints", []),
377 obj.get("requirements", []),
380 discover_secondary_files(arvrunner.fs_access,
386 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
387 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
389 for d in list(discovered):
390 # Only interested in discovered secondaryFiles which are local
391 # files that need to be uploaded.
392 if d.startswith("file:"):
393 sc.extend(discovered[d])
397 mapper = ArvPathMapper(arvrunner, sc, "",
401 single_collection=True)
404 loc = p.get("location")
405 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
406 p["location"] = mapper.mapper(p["location"]).resolved
412 if collectionUUID in p:
413 uuid = p[collectionUUID]
414 if uuid not in uuid_map:
415 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
416 "Collection uuid %s not found" % uuid)
417 gp = collection_pdh_pattern.match(loc)
418 if gp and uuid_map[uuid] != gp.groups()[0]:
419 # This file entry has both collectionUUID and a PDH
420 # location. If the PDH doesn't match the one returned
421 # the API server, raise an error.
422 raise SourceLine(p, "location", validate.ValidationException).makeError(
423 "Expected collection uuid %s to be %s but API server reported %s" % (
424 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
426 gp = collection_uuid_pattern.match(loc)
429 uuid = gp.groups()[0]
430 if uuid not in uuid_map:
431 raise SourceLine(p, "location", validate.ValidationException).makeError(
432 "Collection uuid %s not found" % uuid)
433 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
434 p[collectionUUID] = uuid
436 visit_class(workflowobj, ("File", "Directory"), setloc)
437 visit_class(discovered, ("File", "Directory"), setloc)
439 if discovered_secondaryfiles is not None:
441 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
443 if "$schemas" in workflowobj:
445 for s in workflowobj["$schemas"]:
447 sch.append(mapper.mapper(s).resolved)
448 workflowobj["$schemas"] = sch
453 def upload_docker(arvrunner, tool):
454 """Uploads Docker images used in CommandLineTool objects."""
456 if isinstance(tool, CommandLineTool):
457 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
459 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
460 # TODO: can be supported by containers API, but not jobs API.
461 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
462 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
463 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
464 arvrunner.runtimeContext.force_docker_pull,
465 arvrunner.runtimeContext.tmp_outdir_prefix,
466 arvrunner.runtimeContext.match_local_docker)
468 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
469 True, arvrunner.project_uuid,
470 arvrunner.runtimeContext.force_docker_pull,
471 arvrunner.runtimeContext.tmp_outdir_prefix,
472 arvrunner.runtimeContext.match_local_docker)
473 elif isinstance(tool, cwltool.workflow.Workflow):
475 upload_docker(arvrunner, s.embedded_tool)
478 def packed_workflow(arvrunner, tool, merged_map):
479 """Create a packed workflow.
481 A "packed" workflow is one where all the components have been combined into a single document."""
484 packed = pack(arvrunner.loadingContext, tool.tool["id"],
485 rewrite_out=rewrites,
486 loader=tool.doc_loader)
488 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
490 def visit(v, cur_id):
491 if isinstance(v, dict):
492 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
493 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
494 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
496 cur_id = rewrite_to_orig.get(v["id"], v["id"])
497 if "path" in v and "location" not in v:
498 v["location"] = v["path"]
500 if "location" in v and cur_id in merged_map:
501 if v["location"] in merged_map[cur_id].resolved:
502 v["location"] = merged_map[cur_id].resolved[v["location"]]
503 if v["location"] in merged_map[cur_id].secondaryFiles:
504 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
505 if v.get("class") == "DockerRequirement":
506 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
507 arvrunner.project_uuid,
508 arvrunner.runtimeContext.force_docker_pull,
509 arvrunner.runtimeContext.tmp_outdir_prefix,
510 arvrunner.runtimeContext.match_local_docker)
513 if isinstance(v, list):
520 def tag_git_version(packed):
521 if tool.tool["id"].startswith("file://"):
522 path = os.path.dirname(tool.tool["id"][7:])
524 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
525 except (OSError, subprocess.CalledProcessError):
528 packed["http://schema.org/version"] = githash
531 def upload_job_order(arvrunner, name, tool, job_order):
532 """Upload local files referenced in the input object and return updated input
533 object with 'location' updated to the proper keep references.
536 # Make a copy of the job order and set defaults.
537 builder_job_order = copy.copy(job_order)
539 # fill_in_defaults throws an error if there are any
540 # missing required parameters, we don't want it to do that
541 # so make them all optional.
542 inputs_copy = copy.deepcopy(tool.tool["inputs"])
543 for i in inputs_copy:
544 if "null" not in i["type"]:
545 i["type"] = ["null"] + aslist(i["type"])
547 fill_in_defaults(inputs_copy,
550 # Need to create a builder object to evaluate expressions.
551 builder = make_builder(builder_job_order,
556 # Now update job_order with secondaryFiles
557 discover_secondary_files(arvrunner.fs_access,
562 jobmapper = upload_dependencies(arvrunner,
566 job_order.get("id", "#"),
569 if "id" in job_order:
572 # Need to filter this out, gets added by cwltool when providing
573 # parameters on the command line.
574 if "job_order" in job_order:
575 del job_order["job_order"]
579 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
581 def upload_workflow_deps(arvrunner, tool):
582 # Ensure that Docker images needed by this workflow are available
584 upload_docker(arvrunner, tool)
586 document_loader = tool.doc_loader
590 def upload_tool_deps(deptool):
592 discovered_secondaryfiles = {}
593 pm = upload_dependencies(arvrunner,
594 "%s dependencies" % (shortname(deptool["id"])),
599 include_primary=False,
600 discovered_secondaryfiles=discovered_secondaryfiles)
601 document_loader.idx[deptool["id"]] = deptool
603 for k,v in pm.items():
604 toolmap[k] = v.resolved
605 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
607 tool.visit(upload_tool_deps)
611 def arvados_jobs_image(arvrunner, img):
612 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
615 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
616 arvrunner.runtimeContext.force_docker_pull,
617 arvrunner.runtimeContext.tmp_outdir_prefix,
618 arvrunner.runtimeContext.match_local_docker)
619 except Exception as e:
620 raise Exception("Docker image %s is not available\n%s" % (img, e) )
623 def upload_workflow_collection(arvrunner, name, packed):
624 collection = arvados.collection.Collection(api_client=arvrunner.api,
625 keep_client=arvrunner.keep_client,
626 num_retries=arvrunner.num_retries)
627 with collection.open("workflow.cwl", "w") as f:
628 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
630 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
631 ["name", "like", name+"%"]]
632 if arvrunner.project_uuid:
633 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
634 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
637 logger.info("Using collection %s", exists["items"][0]["uuid"])
639 collection.save_new(name=name,
640 owner_uuid=arvrunner.project_uuid,
641 ensure_unique_name=True,
642 num_retries=arvrunner.num_retries)
643 logger.info("Uploaded to %s", collection.manifest_locator())
645 return collection.portable_data_hash()
648 class Runner(Process):
649 """Base class for runner processes, which submit an instance of
650 arvados-cwl-runner and wait for the final result."""
652 def __init__(self, runner, updated_tool,
653 tool, loadingContext, enable_reuse,
654 output_name, output_tags, submit_runner_ram=0,
655 name=None, on_error=None, submit_runner_image=None,
656 intermediate_output_ttl=0, merged_map=None,
657 priority=None, secret_store=None,
658 collection_cache_size=256,
659 collection_cache_is_default=True):
661 loadingContext = loadingContext.copy()
662 loadingContext.metadata = updated_tool.metadata.copy()
664 super(Runner, self).__init__(updated_tool.tool, loadingContext)
666 self.arvrunner = runner
667 self.embedded_tool = tool
668 self.job_order = None
671 # If reuse is permitted by command line arguments but
672 # disabled by the workflow itself, disable it.
673 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
675 enable_reuse = reuse_req["enableReuse"]
676 self.enable_reuse = enable_reuse
678 self.final_output = None
679 self.output_name = output_name
680 self.output_tags = output_tags
682 self.on_error = on_error
683 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
684 self.intermediate_output_ttl = intermediate_output_ttl
685 self.priority = priority
686 self.secret_store = secret_store
687 self.enable_dev = loadingContext.enable_dev
689 self.submit_runner_cores = 1
690 self.submit_runner_ram = 1024 # defaut 1 GiB
691 self.collection_cache_size = collection_cache_size
693 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
694 if runner_resource_req:
695 if runner_resource_req.get("coresMin"):
696 self.submit_runner_cores = runner_resource_req["coresMin"]
697 if runner_resource_req.get("ramMin"):
698 self.submit_runner_ram = runner_resource_req["ramMin"]
699 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
700 self.collection_cache_size = runner_resource_req["keep_cache"]
702 if submit_runner_ram:
703 # Command line / initializer overrides default and/or spec from workflow
704 self.submit_runner_ram = submit_runner_ram
706 if self.submit_runner_ram <= 0:
707 raise Exception("Value of submit-runner-ram must be greater than zero")
709 if self.submit_runner_cores <= 0:
710 raise Exception("Value of submit-runner-cores must be greater than zero")
712 self.merged_map = merged_map or {}
715 job_order, # type: Mapping[Text, Text]
716 output_callbacks, # type: Callable[[Any, Any], Any]
717 runtimeContext # type: RuntimeContext
718 ): # type: (...) -> Generator[Any, None, None]
719 self.job_order = job_order
720 self._init_job(job_order, runtimeContext)
723 def update_pipeline_component(self, record):
726 def done(self, record):
727 """Base method for handling a completed runner."""
730 if record["state"] == "Complete":
731 if record.get("exit_code") is not None:
732 if record["exit_code"] == 33:
733 processStatus = "UnsupportedRequirement"
734 elif record["exit_code"] == 0:
735 processStatus = "success"
737 processStatus = "permanentFail"
739 processStatus = "success"
741 processStatus = "permanentFail"
745 if processStatus == "permanentFail":
746 logc = arvados.collection.CollectionReader(record["log"],
747 api_client=self.arvrunner.api,
748 keep_client=self.arvrunner.keep_client,
749 num_retries=self.arvrunner.num_retries)
750 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
752 self.final_output = record["output"]
753 outc = arvados.collection.CollectionReader(self.final_output,
754 api_client=self.arvrunner.api,
755 keep_client=self.arvrunner.keep_client,
756 num_retries=self.arvrunner.num_retries)
757 if "cwl.output.json" in outc:
758 with outc.open("cwl.output.json", "rb") as f:
760 outputs = json.loads(f.read().decode())
761 def keepify(fileobj):
762 path = fileobj["location"]
763 if not path.startswith("keep:"):
764 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
765 adjustFileObjs(outputs, keepify)
766 adjustDirObjs(outputs, keepify)
768 logger.exception("[%s] While getting final output object", self.name)
769 self.arvrunner.output_callback({}, "permanentFail")
771 self.arvrunner.output_callback(outputs, processStatus)