1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if isinstance(inputschema, basestring):
132 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
138 if "secondaryFiles" in inputschema:
139 # set secondaryFiles, may be inherited by compound types.
140 secondaryspec = inputschema["secondaryFiles"]
142 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143 not isinstance(inputschema["type"], basestring)):
144 # compound type (union, array, record)
145 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147 elif (inputschema["type"] == "record" and
148 isinstance(primary, Mapping)):
150 # record type, find secondary files associated with fields.
152 for f in inputschema["fields"]:
153 p = primary.get(shortname(f["name"]))
155 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157 elif (inputschema["type"] == "array" and
158 isinstance(primary, Sequence)):
160 # array type, find secondary files of elements
163 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165 elif (inputschema["type"] == "File" and
167 isinstance(primary, Mapping) and
168 primary.get("class") == "File" and
169 "secondaryFiles" not in primary):
171 # Found a file, check for secondaryFiles
174 primary["secondaryFiles"] = secondaryspec
175 for i, sf in enumerate(aslist(secondaryspec)):
176 if builder.cwlVersion == "v1.0":
177 pattern = builder.do_eval(sf, context=primary)
179 pattern = builder.do_eval(sf["pattern"], context=primary)
182 if isinstance(pattern, list):
183 specs.extend(pattern)
184 elif isinstance(pattern, dict):
185 specs.append(pattern)
186 elif isinstance(pattern, str):
187 specs.append({"pattern": pattern, "required": sf.get("required")})
189 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
190 "Expression must return list, object, string or null")
193 for i, sf in enumerate(specs):
194 if isinstance(sf, dict):
195 if sf.get("class") == "File":
197 sfpath = sf["location"]
200 pattern = sf["pattern"]
201 required = sf.get("required")
202 elif isinstance(sf, str):
206 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
207 "Expression must return list, object, string or null")
209 if pattern is not None:
210 sfpath = substitute(primary["location"], pattern)
212 required = builder.do_eval(required, context=primary)
214 if fsaccess.exists(sfpath):
215 if pattern is not None:
216 found.append({"location": sfpath, "class": "File"})
220 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
221 "Required secondary file '%s' does not exist" % sfpath)
223 primary["secondaryFiles"] = cmap(found)
224 if discovered is not None:
225 discovered[primary["location"]] = primary["secondaryFiles"]
226 elif inputschema["type"] not in primitive_types_set:
227 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
229 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
230 for inputschema in inputs:
231 primary = job_order.get(shortname(inputschema["id"]))
232 if isinstance(primary, (Mapping, Sequence)):
233 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
235 def upload_dependencies(arvrunner, name, document_loader,
236 workflowobj, uri, loadref_run,
237 include_primary=True, discovered_secondaryfiles=None):
238 """Upload the dependencies of the workflowobj document to Keep.
240 Returns a pathmapper object mapping local paths to keep references. Also
241 does an in-place update of references in "workflowobj".
243 Use scandeps to find $import, $include, $schemas, run, File and Directory
244 fields that represent external references.
246 If workflowobj has an "id" field, this will reload the document to ensure
247 it is scanning the raw document prior to preprocessing.
252 joined = document_loader.fetcher.urljoin(b, u)
253 defrg, _ = urllib.parse.urldefrag(joined)
254 if defrg not in loaded:
256 # Use fetch_text to get raw file (before preprocessing).
257 text = document_loader.fetch_text(defrg)
258 if isinstance(text, bytes):
259 textIO = StringIO(text.decode('utf-8'))
261 textIO = StringIO(text)
262 return yaml.safe_load(textIO)
267 loadref_fields = set(("$import", "run"))
269 loadref_fields = set(("$import",))
271 scanobj = workflowobj
272 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
273 # Need raw file content (before preprocessing) to ensure
274 # that external references in $include and $mixin are captured.
275 scanobj = loadref("", workflowobj["id"])
279 sc_result = scandeps(uri, scanobj,
281 set(("$include", "$schemas", "location")),
282 loadref, urljoin=document_loader.fetcher.urljoin)
287 def collect_uuids(obj):
288 loc = obj.get("location", "")
291 # Collect collection uuids that need to be resolved to
292 # portable data hashes
293 gp = collection_uuid_pattern.match(loc)
295 uuids[gp.groups()[0]] = obj
296 if collectionUUID in obj:
297 uuids[obj[collectionUUID]] = obj
299 def collect_uploads(obj):
300 loc = obj.get("location", "")
304 if sp[0] in ("file", "http", "https"):
305 # Record local files than need to be uploaded,
306 # don't include file literals, keep references, etc.
310 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
311 visit_class(sc_result, ("File", "Directory"), collect_uploads)
313 # Resolve any collection uuids we found to portable data hashes
314 # and assign them to uuid_map
316 fetch_uuids = list(uuids.keys())
318 # For a large number of fetch_uuids, API server may limit
319 # response size, so keep fetching from API server has nothing
321 lookups = arvrunner.api.collections().list(
322 filters=[["uuid", "in", fetch_uuids]],
324 select=["uuid", "portable_data_hash"]).execute(
325 num_retries=arvrunner.num_retries)
327 if not lookups["items"]:
330 for l in lookups["items"]:
331 uuid_map[l["uuid"]] = l["portable_data_hash"]
333 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
335 normalizeFilesDirs(sc)
337 if include_primary and "id" in workflowobj:
338 sc.append({"class": "File", "location": workflowobj["id"]})
340 if "$schemas" in workflowobj:
341 for s in workflowobj["$schemas"]:
342 sc.append({"class": "File", "location": s})
344 def visit_default(obj):
346 def ensure_default_location(f):
347 if "location" not in f and "path" in f:
348 f["location"] = f["path"]
350 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
351 # Doesn't exist, remove from list of dependencies to upload
352 sc[:] = [x for x in sc if x["location"] != f["location"]]
353 # Delete "default" from workflowobj
355 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
359 find_defaults(workflowobj, visit_default)
362 def discover_default_secondary_files(obj):
363 builder_job_order = {}
364 for t in obj["inputs"]:
365 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
366 # Need to create a builder object to evaluate expressions.
367 builder = make_builder(builder_job_order,
368 obj.get("hints", []),
369 obj.get("requirements", []),
372 discover_secondary_files(arvrunner.fs_access,
378 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
379 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
381 for d in list(discovered):
382 # Only interested in discovered secondaryFiles which are local
383 # files that need to be uploaded.
384 if d.startswith("file:"):
385 sc.extend(discovered[d])
389 mapper = ArvPathMapper(arvrunner, sc, "",
393 single_collection=True)
396 loc = p.get("location")
397 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
398 p["location"] = mapper.mapper(p["location"]).resolved
404 if collectionUUID in p:
405 uuid = p[collectionUUID]
406 if uuid not in uuid_map:
407 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
408 "Collection uuid %s not found" % uuid)
409 gp = collection_pdh_pattern.match(loc)
410 if gp and uuid_map[uuid] != gp.groups()[0]:
411 # This file entry has both collectionUUID and a PDH
412 # location. If the PDH doesn't match the one returned
413 # the API server, raise an error.
414 raise SourceLine(p, "location", validate.ValidationException).makeError(
415 "Expected collection uuid %s to be %s but API server reported %s" % (
416 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
418 gp = collection_uuid_pattern.match(loc)
421 uuid = gp.groups()[0]
422 if uuid not in uuid_map:
423 raise SourceLine(p, "location", validate.ValidationException).makeError(
424 "Collection uuid %s not found" % uuid)
425 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
426 p[collectionUUID] = uuid
428 visit_class(workflowobj, ("File", "Directory"), setloc)
429 visit_class(discovered, ("File", "Directory"), setloc)
431 if discovered_secondaryfiles is not None:
433 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
435 if "$schemas" in workflowobj:
437 for s in workflowobj["$schemas"]:
439 sch.append(mapper.mapper(s).resolved)
440 workflowobj["$schemas"] = sch
445 def upload_docker(arvrunner, tool):
446 """Uploads Docker images used in CommandLineTool objects."""
448 if isinstance(tool, CommandLineTool):
449 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
451 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
452 # TODO: can be supported by containers API, but not jobs API.
453 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
454 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
455 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
456 arvrunner.runtimeContext.force_docker_pull,
457 arvrunner.runtimeContext.tmp_outdir_prefix)
459 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
460 True, arvrunner.project_uuid,
461 arvrunner.runtimeContext.force_docker_pull,
462 arvrunner.runtimeContext.tmp_outdir_prefix)
463 elif isinstance(tool, cwltool.workflow.Workflow):
465 upload_docker(arvrunner, s.embedded_tool)
468 def packed_workflow(arvrunner, tool, merged_map):
469 """Create a packed workflow.
471 A "packed" workflow is one where all the components have been combined into a single document."""
474 packed = pack(arvrunner.loadingContext, tool.tool["id"],
475 rewrite_out=rewrites,
476 loader=tool.doc_loader)
478 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
480 def visit(v, cur_id):
481 if isinstance(v, dict):
482 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
483 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
484 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
486 cur_id = rewrite_to_orig.get(v["id"], v["id"])
487 if "path" in v and "location" not in v:
488 v["location"] = v["path"]
490 if "location" in v and cur_id in merged_map:
491 if v["location"] in merged_map[cur_id].resolved:
492 v["location"] = merged_map[cur_id].resolved[v["location"]]
493 if v["location"] in merged_map[cur_id].secondaryFiles:
494 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
495 if v.get("class") == "DockerRequirement":
496 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
497 arvrunner.project_uuid,
498 arvrunner.runtimeContext.force_docker_pull,
499 arvrunner.runtimeContext.tmp_outdir_prefix)
502 if isinstance(v, list):
509 def tag_git_version(packed):
510 if tool.tool["id"].startswith("file://"):
511 path = os.path.dirname(tool.tool["id"][7:])
513 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
514 except (OSError, subprocess.CalledProcessError):
517 packed["http://schema.org/version"] = githash
520 def upload_job_order(arvrunner, name, tool, job_order):
521 """Upload local files referenced in the input object and return updated input
522 object with 'location' updated to the proper keep references.
525 # Make a copy of the job order and set defaults.
526 builder_job_order = copy.copy(job_order)
528 # fill_in_defaults throws an error if there are any
529 # missing required parameters, we don't want it to do that
530 # so make them all optional.
531 inputs_copy = copy.deepcopy(tool.tool["inputs"])
532 for i in inputs_copy:
533 if "null" not in i["type"]:
534 i["type"] = ["null"] + aslist(i["type"])
536 fill_in_defaults(inputs_copy,
539 # Need to create a builder object to evaluate expressions.
540 builder = make_builder(builder_job_order,
545 # Now update job_order with secondaryFiles
546 discover_secondary_files(arvrunner.fs_access,
551 jobmapper = upload_dependencies(arvrunner,
555 job_order.get("id", "#"),
558 if "id" in job_order:
561 # Need to filter this out, gets added by cwltool when providing
562 # parameters on the command line.
563 if "job_order" in job_order:
564 del job_order["job_order"]
568 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
570 def upload_workflow_deps(arvrunner, tool):
571 # Ensure that Docker images needed by this workflow are available
573 upload_docker(arvrunner, tool)
575 document_loader = tool.doc_loader
579 def upload_tool_deps(deptool):
581 discovered_secondaryfiles = {}
582 pm = upload_dependencies(arvrunner,
583 "%s dependencies" % (shortname(deptool["id"])),
588 include_primary=False,
589 discovered_secondaryfiles=discovered_secondaryfiles)
590 document_loader.idx[deptool["id"]] = deptool
592 for k,v in pm.items():
593 toolmap[k] = v.resolved
594 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
596 tool.visit(upload_tool_deps)
600 def arvados_jobs_image(arvrunner, img):
601 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
604 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
605 arvrunner.runtimeContext.force_docker_pull,
606 arvrunner.runtimeContext.tmp_outdir_prefix)
607 except Exception as e:
608 raise Exception("Docker image %s is not available\n%s" % (img, e) )
611 def upload_workflow_collection(arvrunner, name, packed):
612 collection = arvados.collection.Collection(api_client=arvrunner.api,
613 keep_client=arvrunner.keep_client,
614 num_retries=arvrunner.num_retries)
615 with collection.open("workflow.cwl", "w") as f:
616 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
618 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
619 ["name", "like", name+"%"]]
620 if arvrunner.project_uuid:
621 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
622 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
625 logger.info("Using collection %s", exists["items"][0]["uuid"])
627 collection.save_new(name=name,
628 owner_uuid=arvrunner.project_uuid,
629 ensure_unique_name=True,
630 num_retries=arvrunner.num_retries)
631 logger.info("Uploaded to %s", collection.manifest_locator())
633 return collection.portable_data_hash()
636 class Runner(Process):
637 """Base class for runner processes, which submit an instance of
638 arvados-cwl-runner and wait for the final result."""
640 def __init__(self, runner, updated_tool,
641 tool, loadingContext, enable_reuse,
642 output_name, output_tags, submit_runner_ram=0,
643 name=None, on_error=None, submit_runner_image=None,
644 intermediate_output_ttl=0, merged_map=None,
645 priority=None, secret_store=None,
646 collection_cache_size=256,
647 collection_cache_is_default=True):
649 loadingContext = loadingContext.copy()
650 loadingContext.metadata = updated_tool.metadata.copy()
652 super(Runner, self).__init__(updated_tool.tool, loadingContext)
654 self.arvrunner = runner
655 self.embedded_tool = tool
656 self.job_order = None
659 # If reuse is permitted by command line arguments but
660 # disabled by the workflow itself, disable it.
661 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
663 enable_reuse = reuse_req["enableReuse"]
664 self.enable_reuse = enable_reuse
666 self.final_output = None
667 self.output_name = output_name
668 self.output_tags = output_tags
670 self.on_error = on_error
671 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
672 self.intermediate_output_ttl = intermediate_output_ttl
673 self.priority = priority
674 self.secret_store = secret_store
675 self.enable_dev = loadingContext.enable_dev
677 self.submit_runner_cores = 1
678 self.submit_runner_ram = 1024 # defaut 1 GiB
679 self.collection_cache_size = collection_cache_size
681 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
682 if runner_resource_req:
683 if runner_resource_req.get("coresMin"):
684 self.submit_runner_cores = runner_resource_req["coresMin"]
685 if runner_resource_req.get("ramMin"):
686 self.submit_runner_ram = runner_resource_req["ramMin"]
687 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
688 self.collection_cache_size = runner_resource_req["keep_cache"]
690 if submit_runner_ram:
691 # Command line / initializer overrides default and/or spec from workflow
692 self.submit_runner_ram = submit_runner_ram
694 if self.submit_runner_ram <= 0:
695 raise Exception("Value of submit-runner-ram must be greater than zero")
697 if self.submit_runner_cores <= 0:
698 raise Exception("Value of submit-runner-cores must be greater than zero")
700 self.merged_map = merged_map or {}
703 job_order, # type: Mapping[Text, Text]
704 output_callbacks, # type: Callable[[Any, Any], Any]
705 runtimeContext # type: RuntimeContext
706 ): # type: (...) -> Generator[Any, None, None]
707 self.job_order = job_order
708 self._init_job(job_order, runtimeContext)
711 def update_pipeline_component(self, record):
714 def done(self, record):
715 """Base method for handling a completed runner."""
718 if record["state"] == "Complete":
719 if record.get("exit_code") is not None:
720 if record["exit_code"] == 33:
721 processStatus = "UnsupportedRequirement"
722 elif record["exit_code"] == 0:
723 processStatus = "success"
725 processStatus = "permanentFail"
727 processStatus = "success"
729 processStatus = "permanentFail"
733 if processStatus == "permanentFail":
734 logc = arvados.collection.CollectionReader(record["log"],
735 api_client=self.arvrunner.api,
736 keep_client=self.arvrunner.keep_client,
737 num_retries=self.arvrunner.num_retries)
738 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
740 self.final_output = record["output"]
741 outc = arvados.collection.CollectionReader(self.final_output,
742 api_client=self.arvrunner.api,
743 keep_client=self.arvrunner.keep_client,
744 num_retries=self.arvrunner.num_retries)
745 if "cwl.output.json" in outc:
746 with outc.open("cwl.output.json", "rb") as f:
748 outputs = json.loads(f.read().decode())
749 def keepify(fileobj):
750 path = fileobj["location"]
751 if not path.startswith("keep:"):
752 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
753 adjustFileObjs(outputs, keepify)
754 adjustDirObjs(outputs, keepify)
756 logger.exception("[%s] While getting final output object", self.name)
757 self.arvrunner.output_callback({}, "permanentFail")
759 self.arvrunner.output_callback(outputs, processStatus)