1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if isinstance(inputschema, basestring):
132 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
138 if "secondaryFiles" in inputschema:
139 # set secondaryFiles, may be inherited by compound types.
140 secondaryspec = inputschema["secondaryFiles"]
142 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143 not isinstance(inputschema["type"], basestring)):
144 # compound type (union, array, record)
145 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147 elif (inputschema["type"] == "record" and
148 isinstance(primary, Mapping)):
150 # record type, find secondary files associated with fields.
152 for f in inputschema["fields"]:
153 p = primary.get(shortname(f["name"]))
155 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157 elif (inputschema["type"] == "array" and
158 isinstance(primary, Sequence)):
160 # array type, find secondary files of elements
163 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165 elif (inputschema["type"] == "File" and
167 isinstance(primary, Mapping) and
168 primary.get("class") == "File" and
169 "secondaryFiles" not in primary):
171 # Found a file, check for secondaryFiles
174 primary["secondaryFiles"] = secondaryspec
175 for i, sf in enumerate(aslist(secondaryspec)):
176 if builder.cwlVersion == "v1.0":
177 pattern = builder.do_eval(sf, context=primary)
179 pattern = builder.do_eval(sf["pattern"], context=primary)
182 if isinstance(pattern, list):
183 specs.extend(pattern)
184 elif isinstance(pattern, dict):
185 specs.append(pattern)
186 elif isinstance(pattern, str):
187 if builder.cwlVersion == "v1.0":
188 specs.append({"pattern": pattern, "required": True})
190 specs.append({"pattern": pattern, "required": sf.get("required")})
192 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
193 "Expression must return list, object, string or null")
196 for i, sf in enumerate(specs):
197 if isinstance(sf, dict):
198 if sf.get("class") == "File":
200 if sf.get("location") is None:
201 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202 "File object is missing 'location': %s" % sf)
203 sfpath = sf["location"]
206 pattern = sf["pattern"]
207 required = sf.get("required")
208 elif isinstance(sf, str):
212 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213 "Expression must return list, object, string or null")
215 if pattern is not None:
216 sfpath = substitute(primary["location"], pattern)
218 required = builder.do_eval(required, context=primary)
220 if fsaccess.exists(sfpath):
221 if pattern is not None:
222 found.append({"location": sfpath, "class": "File"})
226 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
227 "Required secondary file '%s' does not exist" % sfpath)
229 primary["secondaryFiles"] = cmap(found)
230 if discovered is not None:
231 discovered[primary["location"]] = primary["secondaryFiles"]
232 elif inputschema["type"] not in primitive_types_set:
233 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
235 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
236 for inputschema in inputs:
237 primary = job_order.get(shortname(inputschema["id"]))
238 if isinstance(primary, (Mapping, Sequence)):
239 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
241 def upload_dependencies(arvrunner, name, document_loader,
242 workflowobj, uri, loadref_run,
243 include_primary=True, discovered_secondaryfiles=None):
244 """Upload the dependencies of the workflowobj document to Keep.
246 Returns a pathmapper object mapping local paths to keep references. Also
247 does an in-place update of references in "workflowobj".
249 Use scandeps to find $import, $include, $schemas, run, File and Directory
250 fields that represent external references.
252 If workflowobj has an "id" field, this will reload the document to ensure
253 it is scanning the raw document prior to preprocessing.
258 joined = document_loader.fetcher.urljoin(b, u)
259 defrg, _ = urllib.parse.urldefrag(joined)
260 if defrg not in loaded:
262 # Use fetch_text to get raw file (before preprocessing).
263 text = document_loader.fetch_text(defrg)
264 if isinstance(text, bytes):
265 textIO = StringIO(text.decode('utf-8'))
267 textIO = StringIO(text)
268 yamlloader = YAML(typ='safe', pure=True)
269 return yamlloader.load(textIO)
274 loadref_fields = set(("$import", "run"))
276 loadref_fields = set(("$import",))
278 scanobj = workflowobj
279 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
280 # Need raw file content (before preprocessing) to ensure
281 # that external references in $include and $mixin are captured.
282 scanobj = loadref("", workflowobj["id"])
286 sc_result = scandeps(uri, scanobj,
288 set(("$include", "location")),
289 loadref, urljoin=document_loader.fetcher.urljoin,
292 optional_deps = scandeps(uri, scanobj,
295 loadref, urljoin=document_loader.fetcher.urljoin,
298 sc_result.extend(optional_deps)
303 def collect_uuids(obj):
304 loc = obj.get("location", "")
307 # Collect collection uuids that need to be resolved to
308 # portable data hashes
309 gp = collection_uuid_pattern.match(loc)
311 uuids[gp.groups()[0]] = obj
312 if collectionUUID in obj:
313 uuids[obj[collectionUUID]] = obj
315 def collect_uploads(obj):
316 loc = obj.get("location", "")
320 if sp[0] in ("file", "http", "https"):
321 # Record local files than need to be uploaded,
322 # don't include file literals, keep references, etc.
326 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
327 visit_class(sc_result, ("File", "Directory"), collect_uploads)
329 # Resolve any collection uuids we found to portable data hashes
330 # and assign them to uuid_map
332 fetch_uuids = list(uuids.keys())
334 # For a large number of fetch_uuids, API server may limit
335 # response size, so keep fetching from API server has nothing
337 lookups = arvrunner.api.collections().list(
338 filters=[["uuid", "in", fetch_uuids]],
340 select=["uuid", "portable_data_hash"]).execute(
341 num_retries=arvrunner.num_retries)
343 if not lookups["items"]:
346 for l in lookups["items"]:
347 uuid_map[l["uuid"]] = l["portable_data_hash"]
349 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
351 normalizeFilesDirs(sc)
353 if include_primary and "id" in workflowobj:
354 sc.append({"class": "File", "location": workflowobj["id"]})
356 def visit_default(obj):
357 def defaults_are_optional(f):
358 if "location" not in f and "path" in f:
359 f["location"] = f["path"]
361 normalizeFilesDirs(f)
362 optional_deps.append(f)
363 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
365 find_defaults(workflowobj, visit_default)
368 def discover_default_secondary_files(obj):
369 builder_job_order = {}
370 for t in obj["inputs"]:
371 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
372 # Need to create a builder object to evaluate expressions.
373 builder = make_builder(builder_job_order,
374 obj.get("hints", []),
375 obj.get("requirements", []),
378 discover_secondary_files(arvrunner.fs_access,
384 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
385 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
387 for d in list(discovered):
388 # Only interested in discovered secondaryFiles which are local
389 # files that need to be uploaded.
390 if d.startswith("file:"):
391 sc.extend(discovered[d])
395 mapper = ArvPathMapper(arvrunner, sc, "",
399 single_collection=True,
400 optional_deps=optional_deps)
403 loc = p.get("location")
404 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
405 p["location"] = mapper.mapper(p["location"]).resolved
411 if collectionUUID in p:
412 uuid = p[collectionUUID]
413 if uuid not in uuid_map:
414 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
415 "Collection uuid %s not found" % uuid)
416 gp = collection_pdh_pattern.match(loc)
417 if gp and uuid_map[uuid] != gp.groups()[0]:
418 # This file entry has both collectionUUID and a PDH
419 # location. If the PDH doesn't match the one returned
420 # the API server, raise an error.
421 raise SourceLine(p, "location", validate.ValidationException).makeError(
422 "Expected collection uuid %s to be %s but API server reported %s" % (
423 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
425 gp = collection_uuid_pattern.match(loc)
428 uuid = gp.groups()[0]
429 if uuid not in uuid_map:
430 raise SourceLine(p, "location", validate.ValidationException).makeError(
431 "Collection uuid %s not found" % uuid)
432 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
433 p[collectionUUID] = uuid
435 visit_class(workflowobj, ("File", "Directory"), setloc)
436 visit_class(discovered, ("File", "Directory"), setloc)
438 if discovered_secondaryfiles is not None:
440 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
442 if "$schemas" in workflowobj:
444 for s in workflowobj["$schemas"]:
446 sch.append(mapper.mapper(s).resolved)
447 workflowobj["$schemas"] = sch
452 def upload_docker(arvrunner, tool):
453 """Uploads Docker images used in CommandLineTool objects."""
455 if isinstance(tool, CommandLineTool):
456 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
458 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
459 # TODO: can be supported by containers API, but not jobs API.
460 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
461 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
462 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
463 arvrunner.runtimeContext.force_docker_pull,
464 arvrunner.runtimeContext.tmp_outdir_prefix,
465 arvrunner.runtimeContext.match_local_docker)
467 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
468 True, arvrunner.project_uuid,
469 arvrunner.runtimeContext.force_docker_pull,
470 arvrunner.runtimeContext.tmp_outdir_prefix,
471 arvrunner.runtimeContext.match_local_docker)
472 elif isinstance(tool, cwltool.workflow.Workflow):
474 upload_docker(arvrunner, s.embedded_tool)
477 def packed_workflow(arvrunner, tool, merged_map):
478 """Create a packed workflow.
480 A "packed" workflow is one where all the components have been combined into a single document."""
483 packed = pack(arvrunner.loadingContext, tool.tool["id"],
484 rewrite_out=rewrites,
485 loader=tool.doc_loader)
487 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
489 def visit(v, cur_id):
490 if isinstance(v, dict):
491 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
492 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
493 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
495 cur_id = rewrite_to_orig.get(v["id"], v["id"])
496 if "path" in v and "location" not in v:
497 v["location"] = v["path"]
499 if "location" in v and cur_id in merged_map:
500 if v["location"] in merged_map[cur_id].resolved:
501 v["location"] = merged_map[cur_id].resolved[v["location"]]
502 if v["location"] in merged_map[cur_id].secondaryFiles:
503 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
504 if v.get("class") == "DockerRequirement":
505 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
506 arvrunner.project_uuid,
507 arvrunner.runtimeContext.force_docker_pull,
508 arvrunner.runtimeContext.tmp_outdir_prefix,
509 arvrunner.runtimeContext.match_local_docker)
512 if isinstance(v, list):
519 def tag_git_version(packed):
520 if tool.tool["id"].startswith("file://"):
521 path = os.path.dirname(tool.tool["id"][7:])
523 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
524 except (OSError, subprocess.CalledProcessError):
527 packed["http://schema.org/version"] = githash
530 def upload_job_order(arvrunner, name, tool, job_order):
531 """Upload local files referenced in the input object and return updated input
532 object with 'location' updated to the proper keep references.
535 # Make a copy of the job order and set defaults.
536 builder_job_order = copy.copy(job_order)
538 # fill_in_defaults throws an error if there are any
539 # missing required parameters, we don't want it to do that
540 # so make them all optional.
541 inputs_copy = copy.deepcopy(tool.tool["inputs"])
542 for i in inputs_copy:
543 if "null" not in i["type"]:
544 i["type"] = ["null"] + aslist(i["type"])
546 fill_in_defaults(inputs_copy,
549 # Need to create a builder object to evaluate expressions.
550 builder = make_builder(builder_job_order,
555 # Now update job_order with secondaryFiles
556 discover_secondary_files(arvrunner.fs_access,
561 jobmapper = upload_dependencies(arvrunner,
565 job_order.get("id", "#"),
568 if "id" in job_order:
571 # Need to filter this out, gets added by cwltool when providing
572 # parameters on the command line.
573 if "job_order" in job_order:
574 del job_order["job_order"]
578 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
580 def upload_workflow_deps(arvrunner, tool):
581 # Ensure that Docker images needed by this workflow are available
583 upload_docker(arvrunner, tool)
585 document_loader = tool.doc_loader
589 def upload_tool_deps(deptool):
591 discovered_secondaryfiles = {}
592 pm = upload_dependencies(arvrunner,
593 "%s dependencies" % (shortname(deptool["id"])),
598 include_primary=False,
599 discovered_secondaryfiles=discovered_secondaryfiles)
600 document_loader.idx[deptool["id"]] = deptool
602 for k,v in pm.items():
603 toolmap[k] = v.resolved
604 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
606 tool.visit(upload_tool_deps)
610 def arvados_jobs_image(arvrunner, img):
611 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
614 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
615 arvrunner.runtimeContext.force_docker_pull,
616 arvrunner.runtimeContext.tmp_outdir_prefix,
617 arvrunner.runtimeContext.match_local_docker)
618 except Exception as e:
619 raise Exception("Docker image %s is not available\n%s" % (img, e) )
622 def upload_workflow_collection(arvrunner, name, packed):
623 collection = arvados.collection.Collection(api_client=arvrunner.api,
624 keep_client=arvrunner.keep_client,
625 num_retries=arvrunner.num_retries)
626 with collection.open("workflow.cwl", "w") as f:
627 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
629 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
630 ["name", "like", name+"%"]]
631 if arvrunner.project_uuid:
632 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
633 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
636 logger.info("Using collection %s", exists["items"][0]["uuid"])
638 collection.save_new(name=name,
639 owner_uuid=arvrunner.project_uuid,
640 ensure_unique_name=True,
641 num_retries=arvrunner.num_retries)
642 logger.info("Uploaded to %s", collection.manifest_locator())
644 return collection.portable_data_hash()
647 class Runner(Process):
648 """Base class for runner processes, which submit an instance of
649 arvados-cwl-runner and wait for the final result."""
651 def __init__(self, runner, updated_tool,
652 tool, loadingContext, enable_reuse,
653 output_name, output_tags, submit_runner_ram=0,
654 name=None, on_error=None, submit_runner_image=None,
655 intermediate_output_ttl=0, merged_map=None,
656 priority=None, secret_store=None,
657 collection_cache_size=256,
658 collection_cache_is_default=True):
660 loadingContext = loadingContext.copy()
661 loadingContext.metadata = updated_tool.metadata.copy()
663 super(Runner, self).__init__(updated_tool.tool, loadingContext)
665 self.arvrunner = runner
666 self.embedded_tool = tool
667 self.job_order = None
670 # If reuse is permitted by command line arguments but
671 # disabled by the workflow itself, disable it.
672 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
674 enable_reuse = reuse_req["enableReuse"]
675 self.enable_reuse = enable_reuse
677 self.final_output = None
678 self.output_name = output_name
679 self.output_tags = output_tags
681 self.on_error = on_error
682 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
683 self.intermediate_output_ttl = intermediate_output_ttl
684 self.priority = priority
685 self.secret_store = secret_store
686 self.enable_dev = loadingContext.enable_dev
688 self.submit_runner_cores = 1
689 self.submit_runner_ram = 1024 # defaut 1 GiB
690 self.collection_cache_size = collection_cache_size
692 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
693 if runner_resource_req:
694 if runner_resource_req.get("coresMin"):
695 self.submit_runner_cores = runner_resource_req["coresMin"]
696 if runner_resource_req.get("ramMin"):
697 self.submit_runner_ram = runner_resource_req["ramMin"]
698 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
699 self.collection_cache_size = runner_resource_req["keep_cache"]
701 if submit_runner_ram:
702 # Command line / initializer overrides default and/or spec from workflow
703 self.submit_runner_ram = submit_runner_ram
705 if self.submit_runner_ram <= 0:
706 raise Exception("Value of submit-runner-ram must be greater than zero")
708 if self.submit_runner_cores <= 0:
709 raise Exception("Value of submit-runner-cores must be greater than zero")
711 self.merged_map = merged_map or {}
714 job_order, # type: Mapping[Text, Text]
715 output_callbacks, # type: Callable[[Any, Any], Any]
716 runtimeContext # type: RuntimeContext
717 ): # type: (...) -> Generator[Any, None, None]
718 self.job_order = job_order
719 self._init_job(job_order, runtimeContext)
722 def update_pipeline_component(self, record):
725 def done(self, record):
726 """Base method for handling a completed runner."""
729 if record["state"] == "Complete":
730 if record.get("exit_code") is not None:
731 if record["exit_code"] == 33:
732 processStatus = "UnsupportedRequirement"
733 elif record["exit_code"] == 0:
734 processStatus = "success"
736 processStatus = "permanentFail"
738 processStatus = "success"
740 processStatus = "permanentFail"
744 if processStatus == "permanentFail":
745 logc = arvados.collection.CollectionReader(record["log"],
746 api_client=self.arvrunner.api,
747 keep_client=self.arvrunner.keep_client,
748 num_retries=self.arvrunner.num_retries)
749 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
751 self.final_output = record["output"]
752 outc = arvados.collection.CollectionReader(self.final_output,
753 api_client=self.arvrunner.api,
754 keep_client=self.arvrunner.keep_client,
755 num_retries=self.arvrunner.num_retries)
756 if "cwl.output.json" in outc:
757 with outc.open("cwl.output.json", "rb") as f:
759 outputs = json.loads(f.read().decode())
760 def keepify(fileobj):
761 path = fileobj["location"]
762 if not path.startswith("keep:"):
763 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
764 adjustFileObjs(outputs, keepify)
765 adjustDirObjs(outputs, keepify)
767 logger.exception("[%s] While getting final output object", self.name)
768 self.arvrunner.output_callback({}, "permanentFail")
770 self.arvrunner.output_callback(outputs, processStatus)