1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if isinstance(inputschema, basestring):
132 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
138 if "secondaryFiles" in inputschema:
139 # set secondaryFiles, may be inherited by compound types.
140 secondaryspec = inputschema["secondaryFiles"]
142 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143 not isinstance(inputschema["type"], basestring)):
144 # compound type (union, array, record)
145 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147 elif (inputschema["type"] == "record" and
148 isinstance(primary, Mapping)):
150 # record type, find secondary files associated with fields.
152 for f in inputschema["fields"]:
153 p = primary.get(shortname(f["name"]))
155 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157 elif (inputschema["type"] == "array" and
158 isinstance(primary, Sequence)):
160 # array type, find secondary files of elements
163 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165 elif (inputschema["type"] == "File" and
167 isinstance(primary, Mapping) and
168 primary.get("class") == "File" and
169 "secondaryFiles" not in primary):
171 # Found a file, check for secondaryFiles
174 primary["secondaryFiles"] = secondaryspec
175 for i, sf in enumerate(aslist(secondaryspec)):
176 if builder.cwlVersion == "v1.0":
177 pattern = builder.do_eval(sf, context=primary)
179 pattern = builder.do_eval(sf["pattern"], context=primary)
182 if isinstance(pattern, list):
183 specs.extend(pattern)
184 elif isinstance(pattern, dict):
185 specs.append(pattern)
186 elif isinstance(pattern, str):
187 if builder.cwlVersion == "v1.0":
188 specs.append({"pattern": pattern, "required": True})
190 specs.append({"pattern": pattern, "required": sf.get("required")})
192 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
193 "Expression must return list, object, string or null")
196 for i, sf in enumerate(specs):
197 if isinstance(sf, dict):
198 if sf.get("class") == "File":
200 if sf.get("location") is None:
201 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202 "File object is missing 'location': %s" % sf)
203 sfpath = sf["location"]
206 pattern = sf["pattern"]
207 required = sf.get("required")
208 elif isinstance(sf, str):
212 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213 "Expression must return list, object, string or null")
215 if pattern is not None:
216 sfpath = substitute(primary["location"], pattern)
218 required = builder.do_eval(required, context=primary)
220 if fsaccess.exists(sfpath):
221 if pattern is not None:
222 found.append({"location": sfpath, "class": "File"})
226 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
227 "Required secondary file '%s' does not exist" % sfpath)
229 primary["secondaryFiles"] = cmap(found)
230 if discovered is not None:
231 discovered[primary["location"]] = primary["secondaryFiles"]
232 elif inputschema["type"] not in primitive_types_set:
233 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
235 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
236 for inputschema in inputs:
237 primary = job_order.get(shortname(inputschema["id"]))
238 if isinstance(primary, (Mapping, Sequence)):
239 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
241 def upload_dependencies(arvrunner, name, document_loader,
242 workflowobj, uri, loadref_run,
243 include_primary=True, discovered_secondaryfiles=None):
244 """Upload the dependencies of the workflowobj document to Keep.
246 Returns a pathmapper object mapping local paths to keep references. Also
247 does an in-place update of references in "workflowobj".
249 Use scandeps to find $import, $include, $schemas, run, File and Directory
250 fields that represent external references.
252 If workflowobj has an "id" field, this will reload the document to ensure
253 it is scanning the raw document prior to preprocessing.
258 joined = document_loader.fetcher.urljoin(b, u)
259 defrg, _ = urllib.parse.urldefrag(joined)
260 if defrg not in loaded:
262 # Use fetch_text to get raw file (before preprocessing).
263 text = document_loader.fetch_text(defrg)
264 if isinstance(text, bytes):
265 textIO = StringIO(text.decode('utf-8'))
267 textIO = StringIO(text)
268 return yaml.safe_load(textIO)
273 loadref_fields = set(("$import", "run"))
275 loadref_fields = set(("$import",))
277 scanobj = workflowobj
278 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
279 # Need raw file content (before preprocessing) to ensure
280 # that external references in $include and $mixin are captured.
281 scanobj = loadref("", workflowobj["id"])
285 sc_result = scandeps(uri, scanobj,
287 set(("$include", "$schemas", "location")),
288 loadref, urljoin=document_loader.fetcher.urljoin,
294 def collect_uuids(obj):
295 loc = obj.get("location", "")
298 # Collect collection uuids that need to be resolved to
299 # portable data hashes
300 gp = collection_uuid_pattern.match(loc)
302 uuids[gp.groups()[0]] = obj
303 if collectionUUID in obj:
304 uuids[obj[collectionUUID]] = obj
306 def collect_uploads(obj):
307 loc = obj.get("location", "")
311 if sp[0] in ("file", "http", "https"):
312 # Record local files than need to be uploaded,
313 # don't include file literals, keep references, etc.
317 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
318 visit_class(sc_result, ("File", "Directory"), collect_uploads)
320 # Resolve any collection uuids we found to portable data hashes
321 # and assign them to uuid_map
323 fetch_uuids = list(uuids.keys())
325 # For a large number of fetch_uuids, API server may limit
326 # response size, so keep fetching from API server has nothing
328 lookups = arvrunner.api.collections().list(
329 filters=[["uuid", "in", fetch_uuids]],
331 select=["uuid", "portable_data_hash"]).execute(
332 num_retries=arvrunner.num_retries)
334 if not lookups["items"]:
337 for l in lookups["items"]:
338 uuid_map[l["uuid"]] = l["portable_data_hash"]
340 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
342 normalizeFilesDirs(sc)
344 if include_primary and "id" in workflowobj:
345 sc.append({"class": "File", "location": workflowobj["id"]})
347 if "$schemas" in workflowobj:
348 for s in workflowobj["$schemas"]:
349 sc.append({"class": "File", "location": s})
351 def visit_default(obj):
353 def ensure_default_location(f):
354 if "location" not in f and "path" in f:
355 f["location"] = f["path"]
357 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
358 # Doesn't exist, remove from list of dependencies to upload
359 sc[:] = [x for x in sc if x["location"] != f["location"]]
360 # Delete "default" from workflowobj
362 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
366 find_defaults(workflowobj, visit_default)
369 def discover_default_secondary_files(obj):
370 builder_job_order = {}
371 for t in obj["inputs"]:
372 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
373 # Need to create a builder object to evaluate expressions.
374 builder = make_builder(builder_job_order,
375 obj.get("hints", []),
376 obj.get("requirements", []),
379 discover_secondary_files(arvrunner.fs_access,
385 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
386 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
388 for d in list(discovered):
389 # Only interested in discovered secondaryFiles which are local
390 # files that need to be uploaded.
391 if d.startswith("file:"):
392 sc.extend(discovered[d])
396 mapper = ArvPathMapper(arvrunner, sc, "",
400 single_collection=True)
403 loc = p.get("location")
404 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
405 p["location"] = mapper.mapper(p["location"]).resolved
411 if collectionUUID in p:
412 uuid = p[collectionUUID]
413 if uuid not in uuid_map:
414 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
415 "Collection uuid %s not found" % uuid)
416 gp = collection_pdh_pattern.match(loc)
417 if gp and uuid_map[uuid] != gp.groups()[0]:
418 # This file entry has both collectionUUID and a PDH
419 # location. If the PDH doesn't match the one returned
420 # the API server, raise an error.
421 raise SourceLine(p, "location", validate.ValidationException).makeError(
422 "Expected collection uuid %s to be %s but API server reported %s" % (
423 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
425 gp = collection_uuid_pattern.match(loc)
428 uuid = gp.groups()[0]
429 if uuid not in uuid_map:
430 raise SourceLine(p, "location", validate.ValidationException).makeError(
431 "Collection uuid %s not found" % uuid)
432 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
433 p[collectionUUID] = uuid
435 visit_class(workflowobj, ("File", "Directory"), setloc)
436 visit_class(discovered, ("File", "Directory"), setloc)
438 if discovered_secondaryfiles is not None:
440 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
442 if "$schemas" in workflowobj:
444 for s in workflowobj["$schemas"]:
446 sch.append(mapper.mapper(s).resolved)
447 workflowobj["$schemas"] = sch
452 def upload_docker(arvrunner, tool):
453 """Uploads Docker images used in CommandLineTool objects."""
455 if isinstance(tool, CommandLineTool):
456 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
458 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
459 # TODO: can be supported by containers API, but not jobs API.
460 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
461 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
462 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
463 arvrunner.runtimeContext.force_docker_pull,
464 arvrunner.runtimeContext.tmp_outdir_prefix)
466 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
467 True, arvrunner.project_uuid,
468 arvrunner.runtimeContext.force_docker_pull,
469 arvrunner.runtimeContext.tmp_outdir_prefix)
470 elif isinstance(tool, cwltool.workflow.Workflow):
472 upload_docker(arvrunner, s.embedded_tool)
475 def packed_workflow(arvrunner, tool, merged_map):
476 """Create a packed workflow.
478 A "packed" workflow is one where all the components have been combined into a single document."""
481 packed = pack(arvrunner.loadingContext, tool.tool["id"],
482 rewrite_out=rewrites,
483 loader=tool.doc_loader)
485 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
487 def visit(v, cur_id):
488 if isinstance(v, dict):
489 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
490 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
491 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
493 cur_id = rewrite_to_orig.get(v["id"], v["id"])
494 if "path" in v and "location" not in v:
495 v["location"] = v["path"]
497 if "location" in v and cur_id in merged_map:
498 if v["location"] in merged_map[cur_id].resolved:
499 v["location"] = merged_map[cur_id].resolved[v["location"]]
500 if v["location"] in merged_map[cur_id].secondaryFiles:
501 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
502 if v.get("class") == "DockerRequirement":
503 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
504 arvrunner.project_uuid,
505 arvrunner.runtimeContext.force_docker_pull,
506 arvrunner.runtimeContext.tmp_outdir_prefix)
509 if isinstance(v, list):
516 def tag_git_version(packed):
517 if tool.tool["id"].startswith("file://"):
518 path = os.path.dirname(tool.tool["id"][7:])
520 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
521 except (OSError, subprocess.CalledProcessError):
524 packed["http://schema.org/version"] = githash
527 def upload_job_order(arvrunner, name, tool, job_order):
528 """Upload local files referenced in the input object and return updated input
529 object with 'location' updated to the proper keep references.
532 # Make a copy of the job order and set defaults.
533 builder_job_order = copy.copy(job_order)
535 # fill_in_defaults throws an error if there are any
536 # missing required parameters, we don't want it to do that
537 # so make them all optional.
538 inputs_copy = copy.deepcopy(tool.tool["inputs"])
539 for i in inputs_copy:
540 if "null" not in i["type"]:
541 i["type"] = ["null"] + aslist(i["type"])
543 fill_in_defaults(inputs_copy,
546 # Need to create a builder object to evaluate expressions.
547 builder = make_builder(builder_job_order,
552 # Now update job_order with secondaryFiles
553 discover_secondary_files(arvrunner.fs_access,
558 jobmapper = upload_dependencies(arvrunner,
562 job_order.get("id", "#"),
565 if "id" in job_order:
568 # Need to filter this out, gets added by cwltool when providing
569 # parameters on the command line.
570 if "job_order" in job_order:
571 del job_order["job_order"]
575 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
577 def upload_workflow_deps(arvrunner, tool):
578 # Ensure that Docker images needed by this workflow are available
580 upload_docker(arvrunner, tool)
582 document_loader = tool.doc_loader
586 def upload_tool_deps(deptool):
588 discovered_secondaryfiles = {}
589 pm = upload_dependencies(arvrunner,
590 "%s dependencies" % (shortname(deptool["id"])),
595 include_primary=False,
596 discovered_secondaryfiles=discovered_secondaryfiles)
597 document_loader.idx[deptool["id"]] = deptool
599 for k,v in pm.items():
600 toolmap[k] = v.resolved
601 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
603 tool.visit(upload_tool_deps)
607 def arvados_jobs_image(arvrunner, img):
608 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
611 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
612 arvrunner.runtimeContext.force_docker_pull,
613 arvrunner.runtimeContext.tmp_outdir_prefix)
614 except Exception as e:
615 raise Exception("Docker image %s is not available\n%s" % (img, e) )
618 def upload_workflow_collection(arvrunner, name, packed):
619 collection = arvados.collection.Collection(api_client=arvrunner.api,
620 keep_client=arvrunner.keep_client,
621 num_retries=arvrunner.num_retries)
622 with collection.open("workflow.cwl", "w") as f:
623 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
625 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
626 ["name", "like", name+"%"]]
627 if arvrunner.project_uuid:
628 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
629 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
632 logger.info("Using collection %s", exists["items"][0]["uuid"])
634 collection.save_new(name=name,
635 owner_uuid=arvrunner.project_uuid,
636 ensure_unique_name=True,
637 num_retries=arvrunner.num_retries)
638 logger.info("Uploaded to %s", collection.manifest_locator())
640 return collection.portable_data_hash()
643 class Runner(Process):
644 """Base class for runner processes, which submit an instance of
645 arvados-cwl-runner and wait for the final result."""
647 def __init__(self, runner, updated_tool,
648 tool, loadingContext, enable_reuse,
649 output_name, output_tags, submit_runner_ram=0,
650 name=None, on_error=None, submit_runner_image=None,
651 intermediate_output_ttl=0, merged_map=None,
652 priority=None, secret_store=None,
653 collection_cache_size=256,
654 collection_cache_is_default=True):
656 loadingContext = loadingContext.copy()
657 loadingContext.metadata = updated_tool.metadata.copy()
659 super(Runner, self).__init__(updated_tool.tool, loadingContext)
661 self.arvrunner = runner
662 self.embedded_tool = tool
663 self.job_order = None
666 # If reuse is permitted by command line arguments but
667 # disabled by the workflow itself, disable it.
668 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
670 enable_reuse = reuse_req["enableReuse"]
671 self.enable_reuse = enable_reuse
673 self.final_output = None
674 self.output_name = output_name
675 self.output_tags = output_tags
677 self.on_error = on_error
678 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
679 self.intermediate_output_ttl = intermediate_output_ttl
680 self.priority = priority
681 self.secret_store = secret_store
682 self.enable_dev = loadingContext.enable_dev
684 self.submit_runner_cores = 1
685 self.submit_runner_ram = 1024 # defaut 1 GiB
686 self.collection_cache_size = collection_cache_size
688 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
689 if runner_resource_req:
690 if runner_resource_req.get("coresMin"):
691 self.submit_runner_cores = runner_resource_req["coresMin"]
692 if runner_resource_req.get("ramMin"):
693 self.submit_runner_ram = runner_resource_req["ramMin"]
694 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
695 self.collection_cache_size = runner_resource_req["keep_cache"]
697 if submit_runner_ram:
698 # Command line / initializer overrides default and/or spec from workflow
699 self.submit_runner_ram = submit_runner_ram
701 if self.submit_runner_ram <= 0:
702 raise Exception("Value of submit-runner-ram must be greater than zero")
704 if self.submit_runner_cores <= 0:
705 raise Exception("Value of submit-runner-cores must be greater than zero")
707 self.merged_map = merged_map or {}
710 job_order, # type: Mapping[Text, Text]
711 output_callbacks, # type: Callable[[Any, Any], Any]
712 runtimeContext # type: RuntimeContext
713 ): # type: (...) -> Generator[Any, None, None]
714 self.job_order = job_order
715 self._init_job(job_order, runtimeContext)
718 def update_pipeline_component(self, record):
721 def done(self, record):
722 """Base method for handling a completed runner."""
725 if record["state"] == "Complete":
726 if record.get("exit_code") is not None:
727 if record["exit_code"] == 33:
728 processStatus = "UnsupportedRequirement"
729 elif record["exit_code"] == 0:
730 processStatus = "success"
732 processStatus = "permanentFail"
734 processStatus = "success"
736 processStatus = "permanentFail"
740 if processStatus == "permanentFail":
741 logc = arvados.collection.CollectionReader(record["log"],
742 api_client=self.arvrunner.api,
743 keep_client=self.arvrunner.keep_client,
744 num_retries=self.arvrunner.num_retries)
745 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
747 self.final_output = record["output"]
748 outc = arvados.collection.CollectionReader(self.final_output,
749 api_client=self.arvrunner.api,
750 keep_client=self.arvrunner.keep_client,
751 num_retries=self.arvrunner.num_retries)
752 if "cwl.output.json" in outc:
753 with outc.open("cwl.output.json", "rb") as f:
755 outputs = json.loads(f.read().decode())
756 def keepify(fileobj):
757 path = fileobj["location"]
758 if not path.startswith("keep:"):
759 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
760 adjustFileObjs(outputs, keepify)
761 adjustDirObjs(outputs, keepify)
763 logger.exception("[%s] While getting final output object", self.name)
764 self.arvrunner.output_callback({}, "permanentFail")
766 self.arvrunner.output_callback(outputs, processStatus)