4a8a22a4a99c23ca8f7b0df04ceba5b0a67d5890
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8
9 import os
10 import sys
11 import re
12 import urllib.parse
13 from functools import partial
14 import logging
15 import json
16 import copy
17 from collections import namedtuple
18 from io import StringIO
19 from typing import Mapping, Sequence
20
21 if os.name == "posix" and sys.version_info[0] < 3:
22     import subprocess32 as subprocess
23 else:
24     import subprocess
25
26 from schema_salad.sourceline import SourceLine, cmap
27
28 from cwltool.command_line_tool import CommandLineTool
29 import cwltool.workflow
30 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
31                              shortname, Process, fill_in_defaults)
32 from cwltool.load_tool import fetch_document
33 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
34 from cwltool.utils import aslist
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44
45 import arvados_cwl.arvdocker
46 from .pathmapper import ArvPathMapper, trim_listing
47 from ._version import __version__
48 from . import done
49 from . context import ArvRuntimeContext
50
51 logger = logging.getLogger('arvados.cwl-runner')
52
53 def trim_anonymous_location(obj):
54     """Remove 'location' field from File and Directory literals.
55
56     To make internal handling easier, literals are assigned a random id for
57     'location'.  However, when writing the record back out, this can break
58     reproducibility.  Since it is valid for literals not have a 'location'
59     field, remove it.
60
61     """
62
63     if obj.get("location", "").startswith("_:"):
64         del obj["location"]
65
66
67 def remove_redundant_fields(obj):
68     for field in ("path", "nameext", "nameroot", "dirname"):
69         if field in obj:
70             del obj[field]
71
72
73 def find_defaults(d, op):
74     if isinstance(d, list):
75         for i in d:
76             find_defaults(i, op)
77     elif isinstance(d, dict):
78         if "default" in d:
79             op(d)
80         else:
81             for i in viewvalues(d):
82                 find_defaults(i, op)
83
84 def make_builder(joborder, hints, requirements, runtimeContext):
85     return Builder(
86                  job=joborder,
87                  files=[],               # type: List[Dict[Text, Text]]
88                  bindings=[],            # type: List[Dict[Text, Any]]
89                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
90                  names=None,               # type: Names
91                  requirements=requirements,        # type: List[Dict[Text, Any]]
92                  hints=hints,               # type: List[Dict[Text, Any]]
93                  resources={},           # type: Dict[str, int]
94                  mutation_manager=None,    # type: Optional[MutationManager]
95                  formatgraph=None,         # type: Optional[Graph]
96                  make_fs_access=None,      # type: Type[StdFsAccess]
97                  fs_access=None,           # type: StdFsAccess
98                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
99                  timeout=runtimeContext.eval_timeout,             # type: float
100                  debug=runtimeContext.debug,               # type: bool
101                  js_console=runtimeContext.js_console,          # type: bool
102                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
103                  loadListing="",         # type: Text
104                  outdir="",              # type: Text
105                  tmpdir="",              # type: Text
106                  stagedir="",            # type: Text
107                 )
108
109
110 def set_secondary(fsaccess, builder, inputschema, primary, discovered):
111     if isinstance(primary, Mapping) and primary.get("class") == "File":
112         if "secondaryFiles" not in primary:
113             primary["secondaryFiles"] = []
114             for i, sf in enumerate(inputschema["secondaryFiles"]):
115                 pattern = builder.do_eval(sf["pattern"], context=primary)
116                 if pattern is None:
117                     continue
118                 sfpath = substitute(primary["location"], pattern)
119                 required = builder.do_eval(sf["required"], context=primary)
120
121                 if fsaccess.exists(sfpath):
122                     primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
123                 elif required:
124                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
125                         "Required secondary file '%s' does not exist" % sfpath)
126
127             primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
128             if discovered is not None:
129                 discovered[primary["location"]] = primary["secondaryFiles"]
130     elif isinstance(primary, Sequence):
131         for e in primary:
132             set_secondary(fsaccess, builder, inputschema, e, discovered)
133
134 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
135     for inputschema in inputs:
136         primary = job_order.get(shortname(inputschema["id"]))
137         if isinstance(primary, (Mapping, Sequence)) and inputschema.get("secondaryFiles"):
138             set_secondary(fsaccess, builder, inputschema, primary, discovered)
139
140 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
141 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
142
143 def upload_dependencies(arvrunner, name, document_loader,
144                         workflowobj, uri, loadref_run,
145                         include_primary=True, discovered_secondaryfiles=None):
146     """Upload the dependencies of the workflowobj document to Keep.
147
148     Returns a pathmapper object mapping local paths to keep references.  Also
149     does an in-place update of references in "workflowobj".
150
151     Use scandeps to find $import, $include, $schemas, run, File and Directory
152     fields that represent external references.
153
154     If workflowobj has an "id" field, this will reload the document to ensure
155     it is scanning the raw document prior to preprocessing.
156     """
157
158     loaded = set()
159     def loadref(b, u):
160         joined = document_loader.fetcher.urljoin(b, u)
161         defrg, _ = urllib.parse.urldefrag(joined)
162         if defrg not in loaded:
163             loaded.add(defrg)
164             # Use fetch_text to get raw file (before preprocessing).
165             text = document_loader.fetch_text(defrg)
166             if isinstance(text, bytes):
167                 textIO = StringIO(text.decode('utf-8'))
168             else:
169                 textIO = StringIO(text)
170             return yaml.safe_load(textIO)
171         else:
172             return {}
173
174     if loadref_run:
175         loadref_fields = set(("$import", "run"))
176     else:
177         loadref_fields = set(("$import",))
178
179     scanobj = workflowobj
180     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
181         # Need raw file content (before preprocessing) to ensure
182         # that external references in $include and $mixin are captured.
183         scanobj = loadref("", workflowobj["id"])
184
185     sc_result = scandeps(uri, scanobj,
186                   loadref_fields,
187                   set(("$include", "$schemas", "location")),
188                   loadref, urljoin=document_loader.fetcher.urljoin)
189
190     sc = []
191     uuids = {}
192
193     def collect_uuids(obj):
194         loc = obj.get("location", "")
195         sp = loc.split(":")
196         if sp[0] == "keep":
197             # Collect collection uuids that need to be resolved to
198             # portable data hashes
199             gp = collection_uuid_pattern.match(loc)
200             if gp:
201                 uuids[gp.groups()[0]] = obj
202             if collectionUUID in obj:
203                 uuids[obj[collectionUUID]] = obj
204
205     def collect_uploads(obj):
206         loc = obj.get("location", "")
207         sp = loc.split(":")
208         if len(sp) < 1:
209             return
210         if sp[0] in ("file", "http", "https"):
211             # Record local files than need to be uploaded,
212             # don't include file literals, keep references, etc.
213             sc.append(obj)
214         collect_uuids(obj)
215
216     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
217     visit_class(sc_result, ("File", "Directory"), collect_uploads)
218
219     # Resolve any collection uuids we found to portable data hashes
220     # and assign them to uuid_map
221     uuid_map = {}
222     fetch_uuids = list(uuids.keys())
223     while fetch_uuids:
224         # For a large number of fetch_uuids, API server may limit
225         # response size, so keep fetching from API server has nothing
226         # more to give us.
227         lookups = arvrunner.api.collections().list(
228             filters=[["uuid", "in", fetch_uuids]],
229             count="none",
230             select=["uuid", "portable_data_hash"]).execute(
231                 num_retries=arvrunner.num_retries)
232
233         if not lookups["items"]:
234             break
235
236         for l in lookups["items"]:
237             uuid_map[l["uuid"]] = l["portable_data_hash"]
238
239         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
240
241     normalizeFilesDirs(sc)
242
243     if include_primary and "id" in workflowobj:
244         sc.append({"class": "File", "location": workflowobj["id"]})
245
246     if "$schemas" in workflowobj:
247         for s in workflowobj["$schemas"]:
248             sc.append({"class": "File", "location": s})
249
250     def visit_default(obj):
251         remove = [False]
252         def ensure_default_location(f):
253             if "location" not in f and "path" in f:
254                 f["location"] = f["path"]
255                 del f["path"]
256             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
257                 # Doesn't exist, remove from list of dependencies to upload
258                 sc[:] = [x for x in sc if x["location"] != f["location"]]
259                 # Delete "default" from workflowobj
260                 remove[0] = True
261         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
262         if remove[0]:
263             del obj["default"]
264
265     find_defaults(workflowobj, visit_default)
266
267     discovered = {}
268     def discover_default_secondary_files(obj):
269         builder_job_order = {}
270         for t in obj["inputs"]:
271             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
272         # Need to create a builder object to evaluate expressions.
273         builder = make_builder(builder_job_order,
274                                obj.get("hints", []),
275                                obj.get("requirements", []),
276                                ArvRuntimeContext())
277         discover_secondary_files(arvrunner.fs_access,
278                                  builder,
279                                  obj["inputs"],
280                                  builder_job_order,
281                                  discovered)
282
283     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
284
285     for d in list(discovered):
286         # Only interested in discovered secondaryFiles which are local
287         # files that need to be uploaded.
288         if d.startswith("file:"):
289             sc.extend(discovered[d])
290         else:
291             del discovered[d]
292
293     mapper = ArvPathMapper(arvrunner, sc, "",
294                            "keep:%s",
295                            "keep:%s/%s",
296                            name=name,
297                            single_collection=True)
298
299     def setloc(p):
300         loc = p.get("location")
301         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
302             p["location"] = mapper.mapper(p["location"]).resolved
303             return
304
305         if not loc:
306             return
307
308         if collectionUUID in p:
309             uuid = p[collectionUUID]
310             if uuid not in uuid_map:
311                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
312                     "Collection uuid %s not found" % uuid)
313             gp = collection_pdh_pattern.match(loc)
314             if gp and uuid_map[uuid] != gp.groups()[0]:
315                 # This file entry has both collectionUUID and a PDH
316                 # location. If the PDH doesn't match the one returned
317                 # the API server, raise an error.
318                 raise SourceLine(p, "location", validate.ValidationException).makeError(
319                     "Expected collection uuid %s to be %s but API server reported %s" % (
320                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
321
322         gp = collection_uuid_pattern.match(loc)
323         if not gp:
324             return
325         uuid = gp.groups()[0]
326         if uuid not in uuid_map:
327             raise SourceLine(p, "location", validate.ValidationException).makeError(
328                 "Collection uuid %s not found" % uuid)
329         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
330         p[collectionUUID] = uuid
331
332     visit_class(workflowobj, ("File", "Directory"), setloc)
333     visit_class(discovered, ("File", "Directory"), setloc)
334
335     if discovered_secondaryfiles is not None:
336         for d in discovered:
337             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
338
339     if "$schemas" in workflowobj:
340         sch = []
341         for s in workflowobj["$schemas"]:
342             sch.append(mapper.mapper(s).resolved)
343         workflowobj["$schemas"] = sch
344
345     return mapper
346
347
348 def upload_docker(arvrunner, tool):
349     """Uploads Docker images used in CommandLineTool objects."""
350
351     if isinstance(tool, CommandLineTool):
352         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
353         if docker_req:
354             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
355                 # TODO: can be supported by containers API, but not jobs API.
356                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
357                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
358             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
359         else:
360             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
361     elif isinstance(tool, cwltool.workflow.Workflow):
362         for s in tool.steps:
363             upload_docker(arvrunner, s.embedded_tool)
364
365
366 def packed_workflow(arvrunner, tool, merged_map):
367     """Create a packed workflow.
368
369     A "packed" workflow is one where all the components have been combined into a single document."""
370
371     rewrites = {}
372     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
373                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
374
375     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
376
377     def visit(v, cur_id):
378         if isinstance(v, dict):
379             if v.get("class") in ("CommandLineTool", "Workflow"):
380                 if "id" not in v:
381                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
382                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
383             if "location" in v and not v["location"].startswith("keep:"):
384                 v["location"] = merged_map[cur_id].resolved[v["location"]]
385             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
386                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
387             if v.get("class") == "DockerRequirement":
388                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
389             for l in v:
390                 visit(v[l], cur_id)
391         if isinstance(v, list):
392             for l in v:
393                 visit(l, cur_id)
394     visit(packed, None)
395     return packed
396
397
398 def tag_git_version(packed):
399     if tool.tool["id"].startswith("file://"):
400         path = os.path.dirname(tool.tool["id"][7:])
401         try:
402             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
403         except (OSError, subprocess.CalledProcessError):
404             pass
405         else:
406             packed["http://schema.org/version"] = githash
407
408
409 def upload_job_order(arvrunner, name, tool, job_order):
410     """Upload local files referenced in the input object and return updated input
411     object with 'location' updated to the proper keep references.
412     """
413
414     # Make a copy of the job order and set defaults.
415     builder_job_order = copy.copy(job_order)
416     fill_in_defaults(tool.tool["inputs"],
417                      builder_job_order,
418                      arvrunner.fs_access)
419     # Need to create a builder object to evaluate expressions.
420     builder = make_builder(builder_job_order,
421                            tool.tool.get("hints", []),
422                            tool.tool.get("requirements", []),
423                            ArvRuntimeContext())
424     # Now update job_order with secondaryFiles
425     discover_secondary_files(arvrunner.fs_access,
426                              builder,
427                              tool.tool["inputs"],
428                              job_order)
429
430     jobmapper = upload_dependencies(arvrunner,
431                                     name,
432                                     tool.doc_loader,
433                                     job_order,
434                                     job_order.get("id", "#"),
435                                     False)
436
437     if "id" in job_order:
438         del job_order["id"]
439
440     # Need to filter this out, gets added by cwltool when providing
441     # parameters on the command line.
442     if "job_order" in job_order:
443         del job_order["job_order"]
444
445     return job_order
446
447 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
448
449 def upload_workflow_deps(arvrunner, tool):
450     # Ensure that Docker images needed by this workflow are available
451
452     upload_docker(arvrunner, tool)
453
454     document_loader = tool.doc_loader
455
456     merged_map = {}
457
458     def upload_tool_deps(deptool):
459         if "id" in deptool:
460             discovered_secondaryfiles = {}
461             pm = upload_dependencies(arvrunner,
462                                      "%s dependencies" % (shortname(deptool["id"])),
463                                      document_loader,
464                                      deptool,
465                                      deptool["id"],
466                                      False,
467                                      include_primary=False,
468                                      discovered_secondaryfiles=discovered_secondaryfiles)
469             document_loader.idx[deptool["id"]] = deptool
470             toolmap = {}
471             for k,v in pm.items():
472                 toolmap[k] = v.resolved
473             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
474
475     tool.visit(upload_tool_deps)
476
477     return merged_map
478
479 def arvados_jobs_image(arvrunner, img):
480     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
481
482     try:
483         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
484     except Exception as e:
485         raise Exception("Docker image %s is not available\n%s" % (img, e) )
486
487
488 def upload_workflow_collection(arvrunner, name, packed):
489     collection = arvados.collection.Collection(api_client=arvrunner.api,
490                                                keep_client=arvrunner.keep_client,
491                                                num_retries=arvrunner.num_retries)
492     with collection.open("workflow.cwl", "w") as f:
493         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
494
495     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
496                ["name", "like", name+"%"]]
497     if arvrunner.project_uuid:
498         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
499     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
500
501     if exists["items"]:
502         logger.info("Using collection %s", exists["items"][0]["uuid"])
503     else:
504         collection.save_new(name=name,
505                             owner_uuid=arvrunner.project_uuid,
506                             ensure_unique_name=True,
507                             num_retries=arvrunner.num_retries)
508         logger.info("Uploaded to %s", collection.manifest_locator())
509
510     return collection.portable_data_hash()
511
512
513 class Runner(Process):
514     """Base class for runner processes, which submit an instance of
515     arvados-cwl-runner and wait for the final result."""
516
517     def __init__(self, runner, tool, loadingContext, enable_reuse,
518                  output_name, output_tags, submit_runner_ram=0,
519                  name=None, on_error=None, submit_runner_image=None,
520                  intermediate_output_ttl=0, merged_map=None,
521                  priority=None, secret_store=None,
522                  collection_cache_size=256,
523                  collection_cache_is_default=True):
524
525         loadingContext = loadingContext.copy()
526         loadingContext.metadata = loadingContext.metadata.copy()
527         loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
528
529         super(Runner, self).__init__(tool.tool, loadingContext)
530
531         self.arvrunner = runner
532         self.embedded_tool = tool
533         self.job_order = None
534         self.running = False
535         if enable_reuse:
536             # If reuse is permitted by command line arguments but
537             # disabled by the workflow itself, disable it.
538             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
539             if reuse_req:
540                 enable_reuse = reuse_req["enableReuse"]
541         self.enable_reuse = enable_reuse
542         self.uuid = None
543         self.final_output = None
544         self.output_name = output_name
545         self.output_tags = output_tags
546         self.name = name
547         self.on_error = on_error
548         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
549         self.intermediate_output_ttl = intermediate_output_ttl
550         self.priority = priority
551         self.secret_store = secret_store
552         self.enable_dev = loadingContext.enable_dev
553
554         self.submit_runner_cores = 1
555         self.submit_runner_ram = 1024  # defaut 1 GiB
556         self.collection_cache_size = collection_cache_size
557
558         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
559         if runner_resource_req:
560             if runner_resource_req.get("coresMin"):
561                 self.submit_runner_cores = runner_resource_req["coresMin"]
562             if runner_resource_req.get("ramMin"):
563                 self.submit_runner_ram = runner_resource_req["ramMin"]
564             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
565                 self.collection_cache_size = runner_resource_req["keep_cache"]
566
567         if submit_runner_ram:
568             # Command line / initializer overrides default and/or spec from workflow
569             self.submit_runner_ram = submit_runner_ram
570
571         if self.submit_runner_ram <= 0:
572             raise Exception("Value of submit-runner-ram must be greater than zero")
573
574         if self.submit_runner_cores <= 0:
575             raise Exception("Value of submit-runner-cores must be greater than zero")
576
577         self.merged_map = merged_map or {}
578
579     def job(self,
580             job_order,         # type: Mapping[Text, Text]
581             output_callbacks,  # type: Callable[[Any, Any], Any]
582             runtimeContext     # type: RuntimeContext
583            ):  # type: (...) -> Generator[Any, None, None]
584         self.job_order = job_order
585         self._init_job(job_order, runtimeContext)
586         yield self
587
588     def update_pipeline_component(self, record):
589         pass
590
591     def done(self, record):
592         """Base method for handling a completed runner."""
593
594         try:
595             if record["state"] == "Complete":
596                 if record.get("exit_code") is not None:
597                     if record["exit_code"] == 33:
598                         processStatus = "UnsupportedRequirement"
599                     elif record["exit_code"] == 0:
600                         processStatus = "success"
601                     else:
602                         processStatus = "permanentFail"
603                 else:
604                     processStatus = "success"
605             else:
606                 processStatus = "permanentFail"
607
608             outputs = {}
609
610             if processStatus == "permanentFail":
611                 logc = arvados.collection.CollectionReader(record["log"],
612                                                            api_client=self.arvrunner.api,
613                                                            keep_client=self.arvrunner.keep_client,
614                                                            num_retries=self.arvrunner.num_retries)
615                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
616
617             self.final_output = record["output"]
618             outc = arvados.collection.CollectionReader(self.final_output,
619                                                        api_client=self.arvrunner.api,
620                                                        keep_client=self.arvrunner.keep_client,
621                                                        num_retries=self.arvrunner.num_retries)
622             if "cwl.output.json" in outc:
623                 with outc.open("cwl.output.json", "rb") as f:
624                     if f.size() > 0:
625                         outputs = json.loads(f.read().decode())
626             def keepify(fileobj):
627                 path = fileobj["location"]
628                 if not path.startswith("keep:"):
629                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
630             adjustFileObjs(outputs, keepify)
631             adjustDirObjs(outputs, keepify)
632         except Exception:
633             logger.exception("[%s] While getting final output object", self.name)
634             self.arvrunner.output_callback({}, "permanentFail")
635         else:
636             self.arvrunner.output_callback(outputs, processStatus)