1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if isinstance(inputschema, basestring):
132 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
138 if "secondaryFiles" in inputschema:
139 # set secondaryFiles, may be inherited by compound types.
140 secondaryspec = inputschema["secondaryFiles"]
142 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143 not isinstance(inputschema["type"], basestring)):
144 # compound type (union, array, record)
145 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147 elif (inputschema["type"] == "record" and
148 isinstance(primary, Mapping)):
150 # record type, find secondary files associated with fields.
152 for f in inputschema["fields"]:
153 p = primary.get(shortname(f["name"]))
155 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157 elif (inputschema["type"] == "array" and
158 isinstance(primary, Sequence)):
160 # array type, find secondary files of elements
163 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165 elif (inputschema["type"] == "File" and
167 isinstance(primary, Mapping) and
168 primary.get("class") == "File" and
169 "secondaryFiles" not in primary):
171 # Found a file, check for secondaryFiles
174 primary["secondaryFiles"] = secondaryspec
175 for i, sf in enumerate(aslist(secondaryspec)):
176 if builder.cwlVersion == "v1.0":
177 pattern = builder.do_eval(sf, context=primary)
179 pattern = builder.do_eval(sf["pattern"], context=primary)
182 if isinstance(pattern, list):
183 specs.extend(pattern)
184 elif isinstance(pattern, dict):
185 specs.append(pattern)
186 elif isinstance(pattern, str):
187 specs.append({"pattern": pattern})
189 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
190 "Expression must return list, object, string or null")
193 for i, sf in enumerate(specs):
194 if isinstance(sf, dict):
195 if sf.get("class") == "File":
196 pattern = sf["basename"]
198 pattern = sf["pattern"]
199 required = sf.get("required")
200 elif isinstance(sf, str):
204 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
205 "Expression must return list, object, string or null")
207 sfpath = substitute(primary["location"], pattern)
208 required = builder.do_eval(required, context=primary)
210 if fsaccess.exists(sfpath):
211 found.append({"location": sfpath, "class": "File"})
213 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214 "Required secondary file '%s' does not exist" % sfpath)
216 primary["secondaryFiles"] = cmap(found)
217 if discovered is not None:
218 discovered[primary["location"]] = primary["secondaryFiles"]
219 elif inputschema["type"] not in primitive_types_set:
220 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
222 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
223 for inputschema in inputs:
224 primary = job_order.get(shortname(inputschema["id"]))
225 if isinstance(primary, (Mapping, Sequence)):
226 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
228 def upload_dependencies(arvrunner, name, document_loader,
229 workflowobj, uri, loadref_run,
230 include_primary=True, discovered_secondaryfiles=None):
231 """Upload the dependencies of the workflowobj document to Keep.
233 Returns a pathmapper object mapping local paths to keep references. Also
234 does an in-place update of references in "workflowobj".
236 Use scandeps to find $import, $include, $schemas, run, File and Directory
237 fields that represent external references.
239 If workflowobj has an "id" field, this will reload the document to ensure
240 it is scanning the raw document prior to preprocessing.
245 joined = document_loader.fetcher.urljoin(b, u)
246 defrg, _ = urllib.parse.urldefrag(joined)
247 if defrg not in loaded:
249 # Use fetch_text to get raw file (before preprocessing).
250 text = document_loader.fetch_text(defrg)
251 if isinstance(text, bytes):
252 textIO = StringIO(text.decode('utf-8'))
254 textIO = StringIO(text)
255 return yaml.safe_load(textIO)
260 loadref_fields = set(("$import", "run"))
262 loadref_fields = set(("$import",))
264 scanobj = workflowobj
265 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
266 # Need raw file content (before preprocessing) to ensure
267 # that external references in $include and $mixin are captured.
268 scanobj = loadref("", workflowobj["id"])
272 sc_result = scandeps(uri, scanobj,
274 set(("$include", "$schemas", "location")),
275 loadref, urljoin=document_loader.fetcher.urljoin)
280 def collect_uuids(obj):
281 loc = obj.get("location", "")
284 # Collect collection uuids that need to be resolved to
285 # portable data hashes
286 gp = collection_uuid_pattern.match(loc)
288 uuids[gp.groups()[0]] = obj
289 if collectionUUID in obj:
290 uuids[obj[collectionUUID]] = obj
292 def collect_uploads(obj):
293 loc = obj.get("location", "")
297 if sp[0] in ("file", "http", "https"):
298 # Record local files than need to be uploaded,
299 # don't include file literals, keep references, etc.
303 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
304 visit_class(sc_result, ("File", "Directory"), collect_uploads)
306 # Resolve any collection uuids we found to portable data hashes
307 # and assign them to uuid_map
309 fetch_uuids = list(uuids.keys())
311 # For a large number of fetch_uuids, API server may limit
312 # response size, so keep fetching from API server has nothing
314 lookups = arvrunner.api.collections().list(
315 filters=[["uuid", "in", fetch_uuids]],
317 select=["uuid", "portable_data_hash"]).execute(
318 num_retries=arvrunner.num_retries)
320 if not lookups["items"]:
323 for l in lookups["items"]:
324 uuid_map[l["uuid"]] = l["portable_data_hash"]
326 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
328 normalizeFilesDirs(sc)
330 if include_primary and "id" in workflowobj:
331 sc.append({"class": "File", "location": workflowobj["id"]})
333 if "$schemas" in workflowobj:
334 for s in workflowobj["$schemas"]:
335 sc.append({"class": "File", "location": s})
337 def visit_default(obj):
339 def ensure_default_location(f):
340 if "location" not in f and "path" in f:
341 f["location"] = f["path"]
343 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
344 # Doesn't exist, remove from list of dependencies to upload
345 sc[:] = [x for x in sc if x["location"] != f["location"]]
346 # Delete "default" from workflowobj
348 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
352 find_defaults(workflowobj, visit_default)
355 def discover_default_secondary_files(obj):
356 builder_job_order = {}
357 for t in obj["inputs"]:
358 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
359 # Need to create a builder object to evaluate expressions.
360 builder = make_builder(builder_job_order,
361 obj.get("hints", []),
362 obj.get("requirements", []),
365 discover_secondary_files(arvrunner.fs_access,
371 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
372 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
374 for d in list(discovered):
375 # Only interested in discovered secondaryFiles which are local
376 # files that need to be uploaded.
377 if d.startswith("file:"):
378 sc.extend(discovered[d])
382 mapper = ArvPathMapper(arvrunner, sc, "",
386 single_collection=True)
389 loc = p.get("location")
390 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
391 p["location"] = mapper.mapper(p["location"]).resolved
397 if collectionUUID in p:
398 uuid = p[collectionUUID]
399 if uuid not in uuid_map:
400 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
401 "Collection uuid %s not found" % uuid)
402 gp = collection_pdh_pattern.match(loc)
403 if gp and uuid_map[uuid] != gp.groups()[0]:
404 # This file entry has both collectionUUID and a PDH
405 # location. If the PDH doesn't match the one returned
406 # the API server, raise an error.
407 raise SourceLine(p, "location", validate.ValidationException).makeError(
408 "Expected collection uuid %s to be %s but API server reported %s" % (
409 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
411 gp = collection_uuid_pattern.match(loc)
414 uuid = gp.groups()[0]
415 if uuid not in uuid_map:
416 raise SourceLine(p, "location", validate.ValidationException).makeError(
417 "Collection uuid %s not found" % uuid)
418 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
419 p[collectionUUID] = uuid
421 visit_class(workflowobj, ("File", "Directory"), setloc)
422 visit_class(discovered, ("File", "Directory"), setloc)
424 if discovered_secondaryfiles is not None:
426 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
428 if "$schemas" in workflowobj:
430 for s in workflowobj["$schemas"]:
432 sch.append(mapper.mapper(s).resolved)
433 workflowobj["$schemas"] = sch
438 def upload_docker(arvrunner, tool):
439 """Uploads Docker images used in CommandLineTool objects."""
441 if isinstance(tool, CommandLineTool):
442 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
444 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
445 # TODO: can be supported by containers API, but not jobs API.
446 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
447 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
448 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
449 arvrunner.runtimeContext.force_docker_pull,
450 arvrunner.runtimeContext.tmp_outdir_prefix)
452 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
453 True, arvrunner.project_uuid,
454 arvrunner.runtimeContext.force_docker_pull,
455 arvrunner.runtimeContext.tmp_outdir_prefix)
456 elif isinstance(tool, cwltool.workflow.Workflow):
458 upload_docker(arvrunner, s.embedded_tool)
461 def packed_workflow(arvrunner, tool, merged_map):
462 """Create a packed workflow.
464 A "packed" workflow is one where all the components have been combined into a single document."""
467 packed = pack(arvrunner.loadingContext, tool.tool["id"],
468 rewrite_out=rewrites,
469 loader=tool.doc_loader)
471 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
473 def visit(v, cur_id):
474 if isinstance(v, dict):
475 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
476 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
477 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
479 cur_id = rewrite_to_orig.get(v["id"], v["id"])
480 if "path" in v and "location" not in v:
481 v["location"] = v["path"]
483 if "location" in v and cur_id in merged_map:
484 if v["location"] in merged_map[cur_id].resolved:
485 v["location"] = merged_map[cur_id].resolved[v["location"]]
486 if v["location"] in merged_map[cur_id].secondaryFiles:
487 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
488 if v.get("class") == "DockerRequirement":
489 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
490 arvrunner.project_uuid,
491 arvrunner.runtimeContext.force_docker_pull,
492 arvrunner.runtimeContext.tmp_outdir_prefix)
495 if isinstance(v, list):
502 def tag_git_version(packed):
503 if tool.tool["id"].startswith("file://"):
504 path = os.path.dirname(tool.tool["id"][7:])
506 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
507 except (OSError, subprocess.CalledProcessError):
510 packed["http://schema.org/version"] = githash
513 def upload_job_order(arvrunner, name, tool, job_order):
514 """Upload local files referenced in the input object and return updated input
515 object with 'location' updated to the proper keep references.
518 # Make a copy of the job order and set defaults.
519 builder_job_order = copy.copy(job_order)
521 # fill_in_defaults throws an error if there are any
522 # missing required parameters, we don't want it to do that
523 # so make them all optional.
524 inputs_copy = copy.deepcopy(tool.tool["inputs"])
525 for i in inputs_copy:
526 if "null" not in i["type"]:
527 i["type"] = ["null"] + aslist(i["type"])
529 fill_in_defaults(inputs_copy,
532 # Need to create a builder object to evaluate expressions.
533 builder = make_builder(builder_job_order,
538 # Now update job_order with secondaryFiles
539 discover_secondary_files(arvrunner.fs_access,
544 jobmapper = upload_dependencies(arvrunner,
548 job_order.get("id", "#"),
551 if "id" in job_order:
554 # Need to filter this out, gets added by cwltool when providing
555 # parameters on the command line.
556 if "job_order" in job_order:
557 del job_order["job_order"]
561 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
563 def upload_workflow_deps(arvrunner, tool):
564 # Ensure that Docker images needed by this workflow are available
566 upload_docker(arvrunner, tool)
568 document_loader = tool.doc_loader
572 def upload_tool_deps(deptool):
574 discovered_secondaryfiles = {}
575 pm = upload_dependencies(arvrunner,
576 "%s dependencies" % (shortname(deptool["id"])),
581 include_primary=False,
582 discovered_secondaryfiles=discovered_secondaryfiles)
583 document_loader.idx[deptool["id"]] = deptool
585 for k,v in pm.items():
586 toolmap[k] = v.resolved
587 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
589 tool.visit(upload_tool_deps)
593 def arvados_jobs_image(arvrunner, img):
594 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
597 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
598 arvrunner.runtimeContext.force_docker_pull,
599 arvrunner.runtimeContext.tmp_outdir_prefix)
600 except Exception as e:
601 raise Exception("Docker image %s is not available\n%s" % (img, e) )
604 def upload_workflow_collection(arvrunner, name, packed):
605 collection = arvados.collection.Collection(api_client=arvrunner.api,
606 keep_client=arvrunner.keep_client,
607 num_retries=arvrunner.num_retries)
608 with collection.open("workflow.cwl", "w") as f:
609 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
611 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
612 ["name", "like", name+"%"]]
613 if arvrunner.project_uuid:
614 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
615 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
618 logger.info("Using collection %s", exists["items"][0]["uuid"])
620 collection.save_new(name=name,
621 owner_uuid=arvrunner.project_uuid,
622 ensure_unique_name=True,
623 num_retries=arvrunner.num_retries)
624 logger.info("Uploaded to %s", collection.manifest_locator())
626 return collection.portable_data_hash()
629 class Runner(Process):
630 """Base class for runner processes, which submit an instance of
631 arvados-cwl-runner and wait for the final result."""
633 def __init__(self, runner, updated_tool,
634 tool, loadingContext, enable_reuse,
635 output_name, output_tags, submit_runner_ram=0,
636 name=None, on_error=None, submit_runner_image=None,
637 intermediate_output_ttl=0, merged_map=None,
638 priority=None, secret_store=None,
639 collection_cache_size=256,
640 collection_cache_is_default=True):
642 loadingContext = loadingContext.copy()
643 loadingContext.metadata = updated_tool.metadata.copy()
645 super(Runner, self).__init__(updated_tool.tool, loadingContext)
647 self.arvrunner = runner
648 self.embedded_tool = tool
649 self.job_order = None
652 # If reuse is permitted by command line arguments but
653 # disabled by the workflow itself, disable it.
654 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
656 enable_reuse = reuse_req["enableReuse"]
657 self.enable_reuse = enable_reuse
659 self.final_output = None
660 self.output_name = output_name
661 self.output_tags = output_tags
663 self.on_error = on_error
664 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
665 self.intermediate_output_ttl = intermediate_output_ttl
666 self.priority = priority
667 self.secret_store = secret_store
668 self.enable_dev = loadingContext.enable_dev
670 self.submit_runner_cores = 1
671 self.submit_runner_ram = 1024 # defaut 1 GiB
672 self.collection_cache_size = collection_cache_size
674 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
675 if runner_resource_req:
676 if runner_resource_req.get("coresMin"):
677 self.submit_runner_cores = runner_resource_req["coresMin"]
678 if runner_resource_req.get("ramMin"):
679 self.submit_runner_ram = runner_resource_req["ramMin"]
680 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
681 self.collection_cache_size = runner_resource_req["keep_cache"]
683 if submit_runner_ram:
684 # Command line / initializer overrides default and/or spec from workflow
685 self.submit_runner_ram = submit_runner_ram
687 if self.submit_runner_ram <= 0:
688 raise Exception("Value of submit-runner-ram must be greater than zero")
690 if self.submit_runner_cores <= 0:
691 raise Exception("Value of submit-runner-cores must be greater than zero")
693 self.merged_map = merged_map or {}
696 job_order, # type: Mapping[Text, Text]
697 output_callbacks, # type: Callable[[Any, Any], Any]
698 runtimeContext # type: RuntimeContext
699 ): # type: (...) -> Generator[Any, None, None]
700 self.job_order = job_order
701 self._init_job(job_order, runtimeContext)
704 def update_pipeline_component(self, record):
707 def done(self, record):
708 """Base method for handling a completed runner."""
711 if record["state"] == "Complete":
712 if record.get("exit_code") is not None:
713 if record["exit_code"] == 33:
714 processStatus = "UnsupportedRequirement"
715 elif record["exit_code"] == 0:
716 processStatus = "success"
718 processStatus = "permanentFail"
720 processStatus = "success"
722 processStatus = "permanentFail"
726 if processStatus == "permanentFail":
727 logc = arvados.collection.CollectionReader(record["log"],
728 api_client=self.arvrunner.api,
729 keep_client=self.arvrunner.keep_client,
730 num_retries=self.arvrunner.num_retries)
731 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
733 self.final_output = record["output"]
734 outc = arvados.collection.CollectionReader(self.final_output,
735 api_client=self.arvrunner.api,
736 keep_client=self.arvrunner.keep_client,
737 num_retries=self.arvrunner.num_retries)
738 if "cwl.output.json" in outc:
739 with outc.open("cwl.output.json", "rb") as f:
741 outputs = json.loads(f.read().decode())
742 def keepify(fileobj):
743 path = fileobj["location"]
744 if not path.startswith("keep:"):
745 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
746 adjustFileObjs(outputs, keepify)
747 adjustDirObjs(outputs, keepify)
749 logger.exception("[%s] While getting final output object", self.name)
750 self.arvrunner.output_callback({}, "permanentFail")
752 self.arvrunner.output_callback(outputs, processStatus)