1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if isinstance(inputschema, basestring):
132 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
138 if "secondaryFiles" in inputschema:
139 # set secondaryFiles, may be inherited by compound types.
140 secondaryspec = inputschema["secondaryFiles"]
142 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143 not isinstance(inputschema["type"], basestring)):
144 # compound type (union, array, record)
145 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147 elif (inputschema["type"] == "record" and
148 isinstance(primary, Mapping)):
150 # record type, find secondary files associated with fields.
152 for f in inputschema["fields"]:
153 p = primary.get(shortname(f["name"]))
155 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157 elif (inputschema["type"] == "array" and
158 isinstance(primary, Sequence)):
160 # array type, find secondary files of elements
163 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165 elif (inputschema["type"] == "File" and
167 isinstance(primary, Mapping) and
168 primary.get("class") == "File" and
169 "secondaryFiles" not in primary):
171 # Found a file, check for secondaryFiles
174 primary["secondaryFiles"] = secondaryspec
175 for i, sf in enumerate(aslist(secondaryspec)):
176 if builder.cwlVersion == "v1.0":
177 pattern = builder.do_eval(sf, context=primary)
179 pattern = builder.do_eval(sf["pattern"], context=primary)
182 if isinstance(pattern, list):
183 specs.extend(pattern)
184 elif isinstance(pattern, dict):
185 specs.append(pattern)
186 elif isinstance(pattern, str):
187 if builder.cwlVersion == "v1.0":
188 specs.append({"pattern": pattern, "required": True})
190 specs.append({"pattern": pattern, "required": sf.get("required")})
192 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
193 "Expression must return list, object, string or null")
196 for i, sf in enumerate(specs):
197 if isinstance(sf, dict):
198 if sf.get("class") == "File":
200 if sf.get("location") is None:
201 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202 "File object is missing 'location': %s" % sf)
203 sfpath = sf["location"]
206 pattern = sf["pattern"]
207 required = sf.get("required")
208 elif isinstance(sf, str):
212 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213 "Expression must return list, object, string or null")
215 if pattern is not None:
216 sfpath = substitute(primary["location"], pattern)
218 required = builder.do_eval(required, context=primary)
220 if fsaccess.exists(sfpath):
221 if pattern is not None:
222 found.append({"location": sfpath, "class": "File"})
226 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
227 "Required secondary file '%s' does not exist" % sfpath)
229 primary["secondaryFiles"] = cmap(found)
230 if discovered is not None:
231 discovered[primary["location"]] = primary["secondaryFiles"]
232 elif inputschema["type"] not in primitive_types_set:
233 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
235 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
236 for inputschema in inputs:
237 primary = job_order.get(shortname(inputschema["id"]))
238 if isinstance(primary, (Mapping, Sequence)):
239 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
241 def upload_dependencies(arvrunner, name, document_loader,
242 workflowobj, uri, loadref_run,
243 include_primary=True, discovered_secondaryfiles=None):
244 """Upload the dependencies of the workflowobj document to Keep.
246 Returns a pathmapper object mapping local paths to keep references. Also
247 does an in-place update of references in "workflowobj".
249 Use scandeps to find $import, $include, $schemas, run, File and Directory
250 fields that represent external references.
252 If workflowobj has an "id" field, this will reload the document to ensure
253 it is scanning the raw document prior to preprocessing.
258 joined = document_loader.fetcher.urljoin(b, u)
259 defrg, _ = urllib.parse.urldefrag(joined)
260 if defrg not in loaded:
262 # Use fetch_text to get raw file (before preprocessing).
263 text = document_loader.fetch_text(defrg)
264 if isinstance(text, bytes):
265 textIO = StringIO(text.decode('utf-8'))
267 textIO = StringIO(text)
268 yamlloader = YAML(typ='safe', pure=True)
269 return yamlloader.load(textIO)
274 loadref_fields = set(("$import", "run"))
276 loadref_fields = set(("$import",))
278 scanobj = workflowobj
279 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
280 # Need raw file content (before preprocessing) to ensure
281 # that external references in $include and $mixin are captured.
282 scanobj = loadref("", workflowobj["id"])
286 sc_result = scandeps(uri, scanobj,
288 set(("$include", "location")),
289 loadref, urljoin=document_loader.fetcher.urljoin,
292 optional_deps = scandeps(uri, scanobj,
295 loadref, urljoin=document_loader.fetcher.urljoin,
298 sc_result.extend(optional_deps)
303 def collect_uuids(obj):
304 loc = obj.get("location", "")
307 # Collect collection uuids that need to be resolved to
308 # portable data hashes
309 gp = collection_uuid_pattern.match(loc)
311 uuids[gp.groups()[0]] = obj
312 if collectionUUID in obj:
313 uuids[obj[collectionUUID]] = obj
315 def collect_uploads(obj):
316 loc = obj.get("location", "")
320 if sp[0] in ("file", "http", "https"):
321 # Record local files than need to be uploaded,
322 # don't include file literals, keep references, etc.
326 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
327 visit_class(sc_result, ("File", "Directory"), collect_uploads)
329 # Resolve any collection uuids we found to portable data hashes
330 # and assign them to uuid_map
332 fetch_uuids = list(uuids.keys())
334 # For a large number of fetch_uuids, API server may limit
335 # response size, so keep fetching from API server has nothing
337 lookups = arvrunner.api.collections().list(
338 filters=[["uuid", "in", fetch_uuids]],
340 select=["uuid", "portable_data_hash"]).execute(
341 num_retries=arvrunner.num_retries)
343 if not lookups["items"]:
346 for l in lookups["items"]:
347 uuid_map[l["uuid"]] = l["portable_data_hash"]
349 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
351 normalizeFilesDirs(sc)
353 if include_primary and "id" in workflowobj:
354 sc.append({"class": "File", "location": workflowobj["id"]})
356 def visit_default(obj):
357 def defaults_are_optional(f):
358 if "location" not in f and "path" in f:
359 f["location"] = f["path"]
361 optional_deps.append(f)
362 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
364 find_defaults(workflowobj, visit_default)
367 def discover_default_secondary_files(obj):
368 builder_job_order = {}
369 for t in obj["inputs"]:
370 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
371 # Need to create a builder object to evaluate expressions.
372 builder = make_builder(builder_job_order,
373 obj.get("hints", []),
374 obj.get("requirements", []),
377 discover_secondary_files(arvrunner.fs_access,
383 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
384 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
386 for d in list(discovered):
387 # Only interested in discovered secondaryFiles which are local
388 # files that need to be uploaded.
389 if d.startswith("file:"):
390 sc.extend(discovered[d])
394 mapper = ArvPathMapper(arvrunner, sc, "",
398 single_collection=True,
399 optional_deps=optional_deps)
402 loc = p.get("location")
403 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
404 p["location"] = mapper.mapper(p["location"]).resolved
410 if collectionUUID in p:
411 uuid = p[collectionUUID]
412 if uuid not in uuid_map:
413 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
414 "Collection uuid %s not found" % uuid)
415 gp = collection_pdh_pattern.match(loc)
416 if gp and uuid_map[uuid] != gp.groups()[0]:
417 # This file entry has both collectionUUID and a PDH
418 # location. If the PDH doesn't match the one returned
419 # the API server, raise an error.
420 raise SourceLine(p, "location", validate.ValidationException).makeError(
421 "Expected collection uuid %s to be %s but API server reported %s" % (
422 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
424 gp = collection_uuid_pattern.match(loc)
427 uuid = gp.groups()[0]
428 if uuid not in uuid_map:
429 raise SourceLine(p, "location", validate.ValidationException).makeError(
430 "Collection uuid %s not found" % uuid)
431 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
432 p[collectionUUID] = uuid
434 visit_class(workflowobj, ("File", "Directory"), setloc)
435 visit_class(discovered, ("File", "Directory"), setloc)
437 if discovered_secondaryfiles is not None:
439 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
441 if "$schemas" in workflowobj:
443 for s in workflowobj["$schemas"]:
445 sch.append(mapper.mapper(s).resolved)
446 workflowobj["$schemas"] = sch
451 def upload_docker(arvrunner, tool):
452 """Uploads Docker images used in CommandLineTool objects."""
454 if isinstance(tool, CommandLineTool):
455 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
457 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
458 # TODO: can be supported by containers API, but not jobs API.
459 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
460 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
461 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
462 arvrunner.runtimeContext.force_docker_pull,
463 arvrunner.runtimeContext.tmp_outdir_prefix,
464 arvrunner.runtimeContext.match_local_docker)
466 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
467 True, arvrunner.project_uuid,
468 arvrunner.runtimeContext.force_docker_pull,
469 arvrunner.runtimeContext.tmp_outdir_prefix,
470 arvrunner.runtimeContext.match_local_docker)
471 elif isinstance(tool, cwltool.workflow.Workflow):
473 upload_docker(arvrunner, s.embedded_tool)
476 def packed_workflow(arvrunner, tool, merged_map):
477 """Create a packed workflow.
479 A "packed" workflow is one where all the components have been combined into a single document."""
482 packed = pack(arvrunner.loadingContext, tool.tool["id"],
483 rewrite_out=rewrites,
484 loader=tool.doc_loader)
486 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
488 def visit(v, cur_id):
489 if isinstance(v, dict):
490 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
491 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
492 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
494 cur_id = rewrite_to_orig.get(v["id"], v["id"])
495 if "path" in v and "location" not in v:
496 v["location"] = v["path"]
498 if "location" in v and cur_id in merged_map:
499 if v["location"] in merged_map[cur_id].resolved:
500 v["location"] = merged_map[cur_id].resolved[v["location"]]
501 if v["location"] in merged_map[cur_id].secondaryFiles:
502 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
503 if v.get("class") == "DockerRequirement":
504 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
505 arvrunner.project_uuid,
506 arvrunner.runtimeContext.force_docker_pull,
507 arvrunner.runtimeContext.tmp_outdir_prefix,
508 arvrunner.runtimeContext.match_local_docker)
511 if isinstance(v, list):
518 def tag_git_version(packed):
519 if tool.tool["id"].startswith("file://"):
520 path = os.path.dirname(tool.tool["id"][7:])
522 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
523 except (OSError, subprocess.CalledProcessError):
526 packed["http://schema.org/version"] = githash
529 def upload_job_order(arvrunner, name, tool, job_order):
530 """Upload local files referenced in the input object and return updated input
531 object with 'location' updated to the proper keep references.
534 # Make a copy of the job order and set defaults.
535 builder_job_order = copy.copy(job_order)
537 # fill_in_defaults throws an error if there are any
538 # missing required parameters, we don't want it to do that
539 # so make them all optional.
540 inputs_copy = copy.deepcopy(tool.tool["inputs"])
541 for i in inputs_copy:
542 if "null" not in i["type"]:
543 i["type"] = ["null"] + aslist(i["type"])
545 fill_in_defaults(inputs_copy,
548 # Need to create a builder object to evaluate expressions.
549 builder = make_builder(builder_job_order,
554 # Now update job_order with secondaryFiles
555 discover_secondary_files(arvrunner.fs_access,
560 jobmapper = upload_dependencies(arvrunner,
564 job_order.get("id", "#"),
567 if "id" in job_order:
570 # Need to filter this out, gets added by cwltool when providing
571 # parameters on the command line.
572 if "job_order" in job_order:
573 del job_order["job_order"]
577 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
579 def upload_workflow_deps(arvrunner, tool):
580 # Ensure that Docker images needed by this workflow are available
582 upload_docker(arvrunner, tool)
584 document_loader = tool.doc_loader
588 def upload_tool_deps(deptool):
590 discovered_secondaryfiles = {}
591 pm = upload_dependencies(arvrunner,
592 "%s dependencies" % (shortname(deptool["id"])),
597 include_primary=False,
598 discovered_secondaryfiles=discovered_secondaryfiles)
599 document_loader.idx[deptool["id"]] = deptool
601 for k,v in pm.items():
602 toolmap[k] = v.resolved
603 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
605 tool.visit(upload_tool_deps)
609 def arvados_jobs_image(arvrunner, img):
610 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
613 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
614 arvrunner.runtimeContext.force_docker_pull,
615 arvrunner.runtimeContext.tmp_outdir_prefix,
616 arvrunner.runtimeContext.match_local_docker)
617 except Exception as e:
618 raise Exception("Docker image %s is not available\n%s" % (img, e) )
621 def upload_workflow_collection(arvrunner, name, packed):
622 collection = arvados.collection.Collection(api_client=arvrunner.api,
623 keep_client=arvrunner.keep_client,
624 num_retries=arvrunner.num_retries)
625 with collection.open("workflow.cwl", "w") as f:
626 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
628 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
629 ["name", "like", name+"%"]]
630 if arvrunner.project_uuid:
631 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
632 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
635 logger.info("Using collection %s", exists["items"][0]["uuid"])
637 collection.save_new(name=name,
638 owner_uuid=arvrunner.project_uuid,
639 ensure_unique_name=True,
640 num_retries=arvrunner.num_retries)
641 logger.info("Uploaded to %s", collection.manifest_locator())
643 return collection.portable_data_hash()
646 class Runner(Process):
647 """Base class for runner processes, which submit an instance of
648 arvados-cwl-runner and wait for the final result."""
650 def __init__(self, runner, updated_tool,
651 tool, loadingContext, enable_reuse,
652 output_name, output_tags, submit_runner_ram=0,
653 name=None, on_error=None, submit_runner_image=None,
654 intermediate_output_ttl=0, merged_map=None,
655 priority=None, secret_store=None,
656 collection_cache_size=256,
657 collection_cache_is_default=True):
659 loadingContext = loadingContext.copy()
660 loadingContext.metadata = updated_tool.metadata.copy()
662 super(Runner, self).__init__(updated_tool.tool, loadingContext)
664 self.arvrunner = runner
665 self.embedded_tool = tool
666 self.job_order = None
669 # If reuse is permitted by command line arguments but
670 # disabled by the workflow itself, disable it.
671 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
673 enable_reuse = reuse_req["enableReuse"]
674 self.enable_reuse = enable_reuse
676 self.final_output = None
677 self.output_name = output_name
678 self.output_tags = output_tags
680 self.on_error = on_error
681 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
682 self.intermediate_output_ttl = intermediate_output_ttl
683 self.priority = priority
684 self.secret_store = secret_store
685 self.enable_dev = loadingContext.enable_dev
687 self.submit_runner_cores = 1
688 self.submit_runner_ram = 1024 # defaut 1 GiB
689 self.collection_cache_size = collection_cache_size
691 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
692 if runner_resource_req:
693 if runner_resource_req.get("coresMin"):
694 self.submit_runner_cores = runner_resource_req["coresMin"]
695 if runner_resource_req.get("ramMin"):
696 self.submit_runner_ram = runner_resource_req["ramMin"]
697 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
698 self.collection_cache_size = runner_resource_req["keep_cache"]
700 if submit_runner_ram:
701 # Command line / initializer overrides default and/or spec from workflow
702 self.submit_runner_ram = submit_runner_ram
704 if self.submit_runner_ram <= 0:
705 raise Exception("Value of submit-runner-ram must be greater than zero")
707 if self.submit_runner_cores <= 0:
708 raise Exception("Value of submit-runner-cores must be greater than zero")
710 self.merged_map = merged_map or {}
713 job_order, # type: Mapping[Text, Text]
714 output_callbacks, # type: Callable[[Any, Any], Any]
715 runtimeContext # type: RuntimeContext
716 ): # type: (...) -> Generator[Any, None, None]
717 self.job_order = job_order
718 self._init_job(job_order, runtimeContext)
721 def update_pipeline_component(self, record):
724 def done(self, record):
725 """Base method for handling a completed runner."""
728 if record["state"] == "Complete":
729 if record.get("exit_code") is not None:
730 if record["exit_code"] == 33:
731 processStatus = "UnsupportedRequirement"
732 elif record["exit_code"] == 0:
733 processStatus = "success"
735 processStatus = "permanentFail"
737 processStatus = "success"
739 processStatus = "permanentFail"
743 if processStatus == "permanentFail":
744 logc = arvados.collection.CollectionReader(record["log"],
745 api_client=self.arvrunner.api,
746 keep_client=self.arvrunner.keep_client,
747 num_retries=self.arvrunner.num_retries)
748 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
750 self.final_output = record["output"]
751 outc = arvados.collection.CollectionReader(self.final_output,
752 api_client=self.arvrunner.api,
753 keep_client=self.arvrunner.keep_client,
754 num_retries=self.arvrunner.num_retries)
755 if "cwl.output.json" in outc:
756 with outc.open("cwl.output.json", "rb") as f:
758 outputs = json.loads(f.read().decode())
759 def keepify(fileobj):
760 path = fileobj["location"]
761 if not path.startswith("keep:"):
762 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
763 adjustFileObjs(outputs, keepify)
764 adjustDirObjs(outputs, keepify)
766 logger.exception("[%s] While getting final output object", self.name)
767 self.arvrunner.output_callback({}, "permanentFail")
769 self.arvrunner.output_callback(outputs, processStatus)