1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if isinstance(inputschema, basestring):
132 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
138 if "secondaryFiles" in inputschema:
139 # set secondaryFiles, may be inherited by compound types.
140 secondaryspec = inputschema["secondaryFiles"]
142 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143 not isinstance(inputschema["type"], basestring)):
144 # compound type (union, array, record)
145 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147 elif (inputschema["type"] == "record" and
148 isinstance(primary, Mapping)):
150 # record type, find secondary files associated with fields.
152 for f in inputschema["fields"]:
153 p = primary.get(shortname(f["name"]))
155 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157 elif (inputschema["type"] == "array" and
158 isinstance(primary, Sequence)):
160 # array type, find secondary files of elements
163 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165 elif (inputschema["type"] == "File" and
167 isinstance(primary, Mapping) and
168 primary.get("class") == "File" and
169 "secondaryFiles" not in primary):
171 # Found a file, check for secondaryFiles
174 primary["secondaryFiles"] = secondaryspec
175 for i, sf in enumerate(aslist(secondaryspec)):
176 if builder.cwlVersion == "v1.0":
177 pattern = builder.do_eval(sf, context=primary)
179 pattern = builder.do_eval(sf["pattern"], context=primary)
182 if isinstance(pattern, list):
183 specs.extend(pattern)
184 elif isinstance(pattern, dict):
185 specs.append(pattern)
186 elif isinstance(pattern, str):
187 if builder.cwlVersion == "v1.0":
188 specs.append({"pattern": pattern, "required": True})
190 specs.append({"pattern": pattern, "required": sf.get("required")})
192 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
193 "Expression must return list, object, string or null")
196 for i, sf in enumerate(specs):
197 if isinstance(sf, dict):
198 if sf.get("class") == "File":
200 if sf.get("location") is None:
201 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202 "File object is missing 'location': %s" % sf)
203 sfpath = sf["location"]
206 pattern = sf["pattern"]
207 required = sf.get("required")
208 elif isinstance(sf, str):
212 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213 "Expression must return list, object, string or null")
215 if pattern is not None:
216 sfpath = substitute(primary["location"], pattern)
218 required = builder.do_eval(required, context=primary)
220 if fsaccess.exists(sfpath):
221 if pattern is not None:
222 found.append({"location": sfpath, "class": "File"})
226 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
227 "Required secondary file '%s' does not exist" % sfpath)
229 primary["secondaryFiles"] = cmap(found)
230 if discovered is not None:
231 discovered[primary["location"]] = primary["secondaryFiles"]
232 elif inputschema["type"] not in primitive_types_set:
233 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
235 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
236 for inputschema in inputs:
237 primary = job_order.get(shortname(inputschema["id"]))
238 if isinstance(primary, (Mapping, Sequence)):
239 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
241 def upload_dependencies(arvrunner, name, document_loader,
242 workflowobj, uri, loadref_run,
243 include_primary=True, discovered_secondaryfiles=None):
244 """Upload the dependencies of the workflowobj document to Keep.
246 Returns a pathmapper object mapping local paths to keep references. Also
247 does an in-place update of references in "workflowobj".
249 Use scandeps to find $import, $include, $schemas, run, File and Directory
250 fields that represent external references.
252 If workflowobj has an "id" field, this will reload the document to ensure
253 it is scanning the raw document prior to preprocessing.
258 joined = document_loader.fetcher.urljoin(b, u)
259 defrg, _ = urllib.parse.urldefrag(joined)
260 if defrg not in loaded:
262 # Use fetch_text to get raw file (before preprocessing).
263 text = document_loader.fetch_text(defrg)
264 if isinstance(text, bytes):
265 textIO = StringIO(text.decode('utf-8'))
267 textIO = StringIO(text)
268 return yaml.safe_load(textIO)
273 loadref_fields = set(("$import", "run"))
275 loadref_fields = set(("$import",))
277 scanobj = workflowobj
278 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
279 # Need raw file content (before preprocessing) to ensure
280 # that external references in $include and $mixin are captured.
281 scanobj = loadref("", workflowobj["id"])
285 sc_result = scandeps(uri, scanobj,
287 set(("$include", "$schemas", "location")),
288 loadref, urljoin=document_loader.fetcher.urljoin)
293 def collect_uuids(obj):
294 loc = obj.get("location", "")
297 # Collect collection uuids that need to be resolved to
298 # portable data hashes
299 gp = collection_uuid_pattern.match(loc)
301 uuids[gp.groups()[0]] = obj
302 if collectionUUID in obj:
303 uuids[obj[collectionUUID]] = obj
305 def collect_uploads(obj):
306 loc = obj.get("location", "")
310 if sp[0] in ("file", "http", "https"):
311 # Record local files than need to be uploaded,
312 # don't include file literals, keep references, etc.
316 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
317 visit_class(sc_result, ("File", "Directory"), collect_uploads)
319 # Resolve any collection uuids we found to portable data hashes
320 # and assign them to uuid_map
322 fetch_uuids = list(uuids.keys())
324 # For a large number of fetch_uuids, API server may limit
325 # response size, so keep fetching from API server has nothing
327 lookups = arvrunner.api.collections().list(
328 filters=[["uuid", "in", fetch_uuids]],
330 select=["uuid", "portable_data_hash"]).execute(
331 num_retries=arvrunner.num_retries)
333 if not lookups["items"]:
336 for l in lookups["items"]:
337 uuid_map[l["uuid"]] = l["portable_data_hash"]
339 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
341 normalizeFilesDirs(sc)
343 if include_primary and "id" in workflowobj:
344 sc.append({"class": "File", "location": workflowobj["id"]})
346 if "$schemas" in workflowobj:
347 for s in workflowobj["$schemas"]:
348 sc.append({"class": "File", "location": s})
350 def visit_default(obj):
352 def ensure_default_location(f):
353 if "location" not in f and "path" in f:
354 f["location"] = f["path"]
356 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
357 # Doesn't exist, remove from list of dependencies to upload
358 sc[:] = [x for x in sc if x["location"] != f["location"]]
359 # Delete "default" from workflowobj
361 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
365 find_defaults(workflowobj, visit_default)
368 def discover_default_secondary_files(obj):
369 builder_job_order = {}
370 for t in obj["inputs"]:
371 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
372 # Need to create a builder object to evaluate expressions.
373 builder = make_builder(builder_job_order,
374 obj.get("hints", []),
375 obj.get("requirements", []),
378 discover_secondary_files(arvrunner.fs_access,
384 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
385 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
387 for d in list(discovered):
388 # Only interested in discovered secondaryFiles which are local
389 # files that need to be uploaded.
390 if d.startswith("file:"):
391 sc.extend(discovered[d])
395 mapper = ArvPathMapper(arvrunner, sc, "",
399 single_collection=True)
402 loc = p.get("location")
403 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
404 p["location"] = mapper.mapper(p["location"]).resolved
410 if collectionUUID in p:
411 uuid = p[collectionUUID]
412 if uuid not in uuid_map:
413 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
414 "Collection uuid %s not found" % uuid)
415 gp = collection_pdh_pattern.match(loc)
416 if gp and uuid_map[uuid] != gp.groups()[0]:
417 # This file entry has both collectionUUID and a PDH
418 # location. If the PDH doesn't match the one returned
419 # the API server, raise an error.
420 raise SourceLine(p, "location", validate.ValidationException).makeError(
421 "Expected collection uuid %s to be %s but API server reported %s" % (
422 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
424 gp = collection_uuid_pattern.match(loc)
427 uuid = gp.groups()[0]
428 if uuid not in uuid_map:
429 raise SourceLine(p, "location", validate.ValidationException).makeError(
430 "Collection uuid %s not found" % uuid)
431 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
432 p[collectionUUID] = uuid
434 visit_class(workflowobj, ("File", "Directory"), setloc)
435 visit_class(discovered, ("File", "Directory"), setloc)
437 if discovered_secondaryfiles is not None:
439 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
441 if "$schemas" in workflowobj:
443 for s in workflowobj["$schemas"]:
445 sch.append(mapper.mapper(s).resolved)
446 workflowobj["$schemas"] = sch
451 def upload_docker(arvrunner, tool):
452 """Uploads Docker images used in CommandLineTool objects."""
454 if isinstance(tool, CommandLineTool):
455 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
457 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
458 # TODO: can be supported by containers API, but not jobs API.
459 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
460 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
461 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
462 arvrunner.runtimeContext.force_docker_pull,
463 arvrunner.runtimeContext.tmp_outdir_prefix)
465 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
466 True, arvrunner.project_uuid,
467 arvrunner.runtimeContext.force_docker_pull,
468 arvrunner.runtimeContext.tmp_outdir_prefix)
469 elif isinstance(tool, cwltool.workflow.Workflow):
471 upload_docker(arvrunner, s.embedded_tool)
474 def packed_workflow(arvrunner, tool, merged_map):
475 """Create a packed workflow.
477 A "packed" workflow is one where all the components have been combined into a single document."""
480 packed = pack(arvrunner.loadingContext, tool.tool["id"],
481 rewrite_out=rewrites,
482 loader=tool.doc_loader)
484 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
486 def visit(v, cur_id):
487 if isinstance(v, dict):
488 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
489 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
490 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
492 cur_id = rewrite_to_orig.get(v["id"], v["id"])
493 if "path" in v and "location" not in v:
494 v["location"] = v["path"]
496 if "location" in v and cur_id in merged_map:
497 if v["location"] in merged_map[cur_id].resolved:
498 v["location"] = merged_map[cur_id].resolved[v["location"]]
499 if v["location"] in merged_map[cur_id].secondaryFiles:
500 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
501 if v.get("class") == "DockerRequirement":
502 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
503 arvrunner.project_uuid,
504 arvrunner.runtimeContext.force_docker_pull,
505 arvrunner.runtimeContext.tmp_outdir_prefix)
508 if isinstance(v, list):
515 def tag_git_version(packed):
516 if tool.tool["id"].startswith("file://"):
517 path = os.path.dirname(tool.tool["id"][7:])
519 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
520 except (OSError, subprocess.CalledProcessError):
523 packed["http://schema.org/version"] = githash
526 def upload_job_order(arvrunner, name, tool, job_order):
527 """Upload local files referenced in the input object and return updated input
528 object with 'location' updated to the proper keep references.
531 # Make a copy of the job order and set defaults.
532 builder_job_order = copy.copy(job_order)
534 # fill_in_defaults throws an error if there are any
535 # missing required parameters, we don't want it to do that
536 # so make them all optional.
537 inputs_copy = copy.deepcopy(tool.tool["inputs"])
538 for i in inputs_copy:
539 if "null" not in i["type"]:
540 i["type"] = ["null"] + aslist(i["type"])
542 fill_in_defaults(inputs_copy,
545 # Need to create a builder object to evaluate expressions.
546 builder = make_builder(builder_job_order,
551 # Now update job_order with secondaryFiles
552 discover_secondary_files(arvrunner.fs_access,
557 jobmapper = upload_dependencies(arvrunner,
561 job_order.get("id", "#"),
564 if "id" in job_order:
567 # Need to filter this out, gets added by cwltool when providing
568 # parameters on the command line.
569 if "job_order" in job_order:
570 del job_order["job_order"]
574 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
576 def upload_workflow_deps(arvrunner, tool):
577 # Ensure that Docker images needed by this workflow are available
579 upload_docker(arvrunner, tool)
581 document_loader = tool.doc_loader
585 def upload_tool_deps(deptool):
587 discovered_secondaryfiles = {}
588 pm = upload_dependencies(arvrunner,
589 "%s dependencies" % (shortname(deptool["id"])),
594 include_primary=False,
595 discovered_secondaryfiles=discovered_secondaryfiles)
596 document_loader.idx[deptool["id"]] = deptool
598 for k,v in pm.items():
599 toolmap[k] = v.resolved
600 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
602 tool.visit(upload_tool_deps)
606 def arvados_jobs_image(arvrunner, img):
607 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
610 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
611 arvrunner.runtimeContext.force_docker_pull,
612 arvrunner.runtimeContext.tmp_outdir_prefix)
613 except Exception as e:
614 raise Exception("Docker image %s is not available\n%s" % (img, e) )
617 def upload_workflow_collection(arvrunner, name, packed):
618 collection = arvados.collection.Collection(api_client=arvrunner.api,
619 keep_client=arvrunner.keep_client,
620 num_retries=arvrunner.num_retries)
621 with collection.open("workflow.cwl", "w") as f:
622 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
624 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
625 ["name", "like", name+"%"]]
626 if arvrunner.project_uuid:
627 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
628 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
631 logger.info("Using collection %s", exists["items"][0]["uuid"])
633 collection.save_new(name=name,
634 owner_uuid=arvrunner.project_uuid,
635 ensure_unique_name=True,
636 num_retries=arvrunner.num_retries)
637 logger.info("Uploaded to %s", collection.manifest_locator())
639 return collection.portable_data_hash()
642 class Runner(Process):
643 """Base class for runner processes, which submit an instance of
644 arvados-cwl-runner and wait for the final result."""
646 def __init__(self, runner, updated_tool,
647 tool, loadingContext, enable_reuse,
648 output_name, output_tags, submit_runner_ram=0,
649 name=None, on_error=None, submit_runner_image=None,
650 intermediate_output_ttl=0, merged_map=None,
651 priority=None, secret_store=None,
652 collection_cache_size=256,
653 collection_cache_is_default=True):
655 loadingContext = loadingContext.copy()
656 loadingContext.metadata = updated_tool.metadata.copy()
658 super(Runner, self).__init__(updated_tool.tool, loadingContext)
660 self.arvrunner = runner
661 self.embedded_tool = tool
662 self.job_order = None
665 # If reuse is permitted by command line arguments but
666 # disabled by the workflow itself, disable it.
667 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
669 enable_reuse = reuse_req["enableReuse"]
670 self.enable_reuse = enable_reuse
672 self.final_output = None
673 self.output_name = output_name
674 self.output_tags = output_tags
676 self.on_error = on_error
677 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
678 self.intermediate_output_ttl = intermediate_output_ttl
679 self.priority = priority
680 self.secret_store = secret_store
681 self.enable_dev = loadingContext.enable_dev
683 self.submit_runner_cores = 1
684 self.submit_runner_ram = 1024 # defaut 1 GiB
685 self.collection_cache_size = collection_cache_size
687 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
688 if runner_resource_req:
689 if runner_resource_req.get("coresMin"):
690 self.submit_runner_cores = runner_resource_req["coresMin"]
691 if runner_resource_req.get("ramMin"):
692 self.submit_runner_ram = runner_resource_req["ramMin"]
693 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
694 self.collection_cache_size = runner_resource_req["keep_cache"]
696 if submit_runner_ram:
697 # Command line / initializer overrides default and/or spec from workflow
698 self.submit_runner_ram = submit_runner_ram
700 if self.submit_runner_ram <= 0:
701 raise Exception("Value of submit-runner-ram must be greater than zero")
703 if self.submit_runner_cores <= 0:
704 raise Exception("Value of submit-runner-cores must be greater than zero")
706 self.merged_map = merged_map or {}
709 job_order, # type: Mapping[Text, Text]
710 output_callbacks, # type: Callable[[Any, Any], Any]
711 runtimeContext # type: RuntimeContext
712 ): # type: (...) -> Generator[Any, None, None]
713 self.job_order = job_order
714 self._init_job(job_order, runtimeContext)
717 def update_pipeline_component(self, record):
720 def done(self, record):
721 """Base method for handling a completed runner."""
724 if record["state"] == "Complete":
725 if record.get("exit_code") is not None:
726 if record["exit_code"] == 33:
727 processStatus = "UnsupportedRequirement"
728 elif record["exit_code"] == 0:
729 processStatus = "success"
731 processStatus = "permanentFail"
733 processStatus = "success"
735 processStatus = "permanentFail"
739 if processStatus == "permanentFail":
740 logc = arvados.collection.CollectionReader(record["log"],
741 api_client=self.arvrunner.api,
742 keep_client=self.arvrunner.keep_client,
743 num_retries=self.arvrunner.num_retries)
744 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
746 self.final_output = record["output"]
747 outc = arvados.collection.CollectionReader(self.final_output,
748 api_client=self.arvrunner.api,
749 keep_client=self.arvrunner.keep_client,
750 num_retries=self.arvrunner.num_retries)
751 if "cwl.output.json" in outc:
752 with outc.open("cwl.output.json", "rb") as f:
754 outputs = json.loads(f.read().decode())
755 def keepify(fileobj):
756 path = fileobj["location"]
757 if not path.startswith("keep:"):
758 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
759 adjustFileObjs(outputs, keepify)
760 adjustDirObjs(outputs, keepify)
762 logger.exception("[%s] While getting final output object", self.name)
763 self.arvrunner.output_callback({}, "permanentFail")
765 self.arvrunner.output_callback(outputs, processStatus)