1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if inputschema == "File":
132 inputschema = {"type": "File"}
134 if isinstance(inputschema, basestring):
135 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
141 if "secondaryFiles" in inputschema:
142 # set secondaryFiles, may be inherited by compound types.
143 secondaryspec = inputschema["secondaryFiles"]
145 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
146 not isinstance(inputschema["type"], basestring)):
147 # compound type (union, array, record)
148 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
150 elif (inputschema["type"] == "record" and
151 isinstance(primary, Mapping)):
153 # record type, find secondary files associated with fields.
155 for f in inputschema["fields"]:
156 p = primary.get(shortname(f["name"]))
158 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
160 elif (inputschema["type"] == "array" and
161 isinstance(primary, Sequence)):
163 # array type, find secondary files of elements
166 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
168 elif (inputschema["type"] == "File" and
169 isinstance(primary, Mapping) and
170 primary.get("class") == "File"):
172 if "secondaryFiles" in primary or not secondaryspec:
177 # Found a file, check for secondaryFiles
180 primary["secondaryFiles"] = secondaryspec
181 for i, sf in enumerate(aslist(secondaryspec)):
182 if builder.cwlVersion == "v1.0":
185 pattern = sf["pattern"]
188 if isinstance(pattern, list):
189 specs.extend(pattern)
190 elif isinstance(pattern, dict):
191 specs.append(pattern)
192 elif isinstance(pattern, str):
193 if builder.cwlVersion == "v1.0":
194 specs.append({"pattern": pattern, "required": True})
196 specs.append({"pattern": pattern, "required": sf.get("required")})
198 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
199 "Expression must return list, object, string or null")
202 for i, sf in enumerate(specs):
203 if isinstance(sf, dict):
204 if sf.get("class") == "File":
206 if sf.get("location") is None:
207 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
208 "File object is missing 'location': %s" % sf)
209 sfpath = sf["location"]
212 pattern = sf["pattern"]
213 required = sf.get("required")
214 elif isinstance(sf, str):
218 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
219 "Expression must return list, object, string or null")
221 if pattern is not None:
222 if "${" in pattern or "$(" in pattern:
223 sfname = builder.do_eval(pattern, context=primary)
225 sfname = substitute(primary["basename"], pattern)
230 p_location = primary["location"]
231 if "/" in p_location:
233 p_location[0 : p_location.rindex("/") + 1]
237 required = builder.do_eval(required, context=primary)
239 if fsaccess.exists(sfpath):
240 if pattern is not None:
241 found.append({"location": sfpath, "class": "File"})
245 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246 "Required secondary file '%s' does not exist" % sfpath)
248 primary["secondaryFiles"] = cmap(found)
249 if discovered is not None:
250 discovered[primary["location"]] = primary["secondaryFiles"]
251 elif inputschema["type"] not in primitive_types_set:
252 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
254 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
255 for inputschema in inputs:
256 primary = job_order.get(shortname(inputschema["id"]))
257 if isinstance(primary, (Mapping, Sequence)):
258 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
260 def upload_dependencies(arvrunner, name, document_loader,
261 workflowobj, uri, loadref_run,
262 include_primary=True, discovered_secondaryfiles=None):
263 """Upload the dependencies of the workflowobj document to Keep.
265 Returns a pathmapper object mapping local paths to keep references. Also
266 does an in-place update of references in "workflowobj".
268 Use scandeps to find $import, $include, $schemas, run, File and Directory
269 fields that represent external references.
271 If workflowobj has an "id" field, this will reload the document to ensure
272 it is scanning the raw document prior to preprocessing.
277 joined = document_loader.fetcher.urljoin(b, u)
278 defrg, _ = urllib.parse.urldefrag(joined)
279 if defrg not in loaded:
281 # Use fetch_text to get raw file (before preprocessing).
282 text = document_loader.fetch_text(defrg)
283 if isinstance(text, bytes):
284 textIO = StringIO(text.decode('utf-8'))
286 textIO = StringIO(text)
287 yamlloader = YAML(typ='safe', pure=True)
288 return yamlloader.load(textIO)
293 loadref_fields = set(("$import", "run"))
295 loadref_fields = set(("$import",))
297 scanobj = workflowobj
298 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
299 # Need raw file content (before preprocessing) to ensure
300 # that external references in $include and $mixin are captured.
301 scanobj = loadref("", workflowobj["id"])
305 sc_result = scandeps(uri, scanobj,
307 set(("$include", "location")),
308 loadref, urljoin=document_loader.fetcher.urljoin,
311 optional_deps = scandeps(uri, scanobj,
314 loadref, urljoin=document_loader.fetcher.urljoin,
317 sc_result.extend(optional_deps)
322 def collect_uuids(obj):
323 loc = obj.get("location", "")
326 # Collect collection uuids that need to be resolved to
327 # portable data hashes
328 gp = collection_uuid_pattern.match(loc)
330 uuids[gp.groups()[0]] = obj
331 if collectionUUID in obj:
332 uuids[obj[collectionUUID]] = obj
334 def collect_uploads(obj):
335 loc = obj.get("location", "")
339 if sp[0] in ("file", "http", "https"):
340 # Record local files than need to be uploaded,
341 # don't include file literals, keep references, etc.
345 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
346 visit_class(sc_result, ("File", "Directory"), collect_uploads)
348 # Resolve any collection uuids we found to portable data hashes
349 # and assign them to uuid_map
351 fetch_uuids = list(uuids.keys())
353 # For a large number of fetch_uuids, API server may limit
354 # response size, so keep fetching from API server has nothing
356 lookups = arvrunner.api.collections().list(
357 filters=[["uuid", "in", fetch_uuids]],
359 select=["uuid", "portable_data_hash"]).execute(
360 num_retries=arvrunner.num_retries)
362 if not lookups["items"]:
365 for l in lookups["items"]:
366 uuid_map[l["uuid"]] = l["portable_data_hash"]
368 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
370 normalizeFilesDirs(sc)
372 if include_primary and "id" in workflowobj:
373 sc.append({"class": "File", "location": workflowobj["id"]})
375 def visit_default(obj):
376 def defaults_are_optional(f):
377 if "location" not in f and "path" in f:
378 f["location"] = f["path"]
380 normalizeFilesDirs(f)
381 optional_deps.append(f)
382 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
384 find_defaults(workflowobj, visit_default)
387 def discover_default_secondary_files(obj):
388 builder_job_order = {}
389 for t in obj["inputs"]:
390 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
391 # Need to create a builder object to evaluate expressions.
392 builder = make_builder(builder_job_order,
393 obj.get("hints", []),
394 obj.get("requirements", []),
397 discover_secondary_files(arvrunner.fs_access,
403 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
404 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
406 for d in list(discovered):
407 # Only interested in discovered secondaryFiles which are local
408 # files that need to be uploaded.
409 if d.startswith("file:"):
410 sc.extend(discovered[d])
414 mapper = ArvPathMapper(arvrunner, sc, "",
418 single_collection=True,
419 optional_deps=optional_deps)
422 loc = p.get("location")
423 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
424 p["location"] = mapper.mapper(p["location"]).resolved
430 if collectionUUID in p:
431 uuid = p[collectionUUID]
432 if uuid not in uuid_map:
433 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
434 "Collection uuid %s not found" % uuid)
435 gp = collection_pdh_pattern.match(loc)
436 if gp and uuid_map[uuid] != gp.groups()[0]:
437 # This file entry has both collectionUUID and a PDH
438 # location. If the PDH doesn't match the one returned
439 # the API server, raise an error.
440 raise SourceLine(p, "location", validate.ValidationException).makeError(
441 "Expected collection uuid %s to be %s but API server reported %s" % (
442 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
444 gp = collection_uuid_pattern.match(loc)
447 uuid = gp.groups()[0]
448 if uuid not in uuid_map:
449 raise SourceLine(p, "location", validate.ValidationException).makeError(
450 "Collection uuid %s not found" % uuid)
451 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
452 p[collectionUUID] = uuid
454 visit_class(workflowobj, ("File", "Directory"), setloc)
455 visit_class(discovered, ("File", "Directory"), setloc)
457 if discovered_secondaryfiles is not None:
459 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
461 if "$schemas" in workflowobj:
463 for s in workflowobj["$schemas"]:
465 sch.append(mapper.mapper(s).resolved)
466 workflowobj["$schemas"] = sch
471 def upload_docker(arvrunner, tool):
472 """Uploads Docker images used in CommandLineTool objects."""
474 if isinstance(tool, CommandLineTool):
475 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
477 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
478 # TODO: can be supported by containers API, but not jobs API.
479 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
480 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
481 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
482 arvrunner.runtimeContext.force_docker_pull,
483 arvrunner.runtimeContext.tmp_outdir_prefix,
484 arvrunner.runtimeContext.match_local_docker)
486 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
487 True, arvrunner.project_uuid,
488 arvrunner.runtimeContext.force_docker_pull,
489 arvrunner.runtimeContext.tmp_outdir_prefix,
490 arvrunner.runtimeContext.match_local_docker)
491 elif isinstance(tool, cwltool.workflow.Workflow):
493 upload_docker(arvrunner, s.embedded_tool)
496 def packed_workflow(arvrunner, tool, merged_map):
497 """Create a packed workflow.
499 A "packed" workflow is one where all the components have been combined into a single document."""
502 packed = pack(arvrunner.loadingContext, tool.tool["id"],
503 rewrite_out=rewrites,
504 loader=tool.doc_loader)
506 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
508 def visit(v, cur_id):
509 if isinstance(v, dict):
510 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
511 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
512 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
514 cur_id = rewrite_to_orig.get(v["id"], v["id"])
515 if "path" in v and "location" not in v:
516 v["location"] = v["path"]
518 if "location" in v and cur_id in merged_map:
519 if v["location"] in merged_map[cur_id].resolved:
520 v["location"] = merged_map[cur_id].resolved[v["location"]]
521 if v["location"] in merged_map[cur_id].secondaryFiles:
522 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
523 if v.get("class") == "DockerRequirement":
524 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
525 arvrunner.project_uuid,
526 arvrunner.runtimeContext.force_docker_pull,
527 arvrunner.runtimeContext.tmp_outdir_prefix,
528 arvrunner.runtimeContext.match_local_docker)
531 if isinstance(v, list):
538 def tag_git_version(packed):
539 if tool.tool["id"].startswith("file://"):
540 path = os.path.dirname(tool.tool["id"][7:])
542 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
543 except (OSError, subprocess.CalledProcessError):
546 packed["http://schema.org/version"] = githash
549 def upload_job_order(arvrunner, name, tool, job_order):
550 """Upload local files referenced in the input object and return updated input
551 object with 'location' updated to the proper keep references.
554 # Make a copy of the job order and set defaults.
555 builder_job_order = copy.copy(job_order)
557 # fill_in_defaults throws an error if there are any
558 # missing required parameters, we don't want it to do that
559 # so make them all optional.
560 inputs_copy = copy.deepcopy(tool.tool["inputs"])
561 for i in inputs_copy:
562 if "null" not in i["type"]:
563 i["type"] = ["null"] + aslist(i["type"])
565 fill_in_defaults(inputs_copy,
568 # Need to create a builder object to evaluate expressions.
569 builder = make_builder(builder_job_order,
574 # Now update job_order with secondaryFiles
575 discover_secondary_files(arvrunner.fs_access,
580 jobmapper = upload_dependencies(arvrunner,
584 job_order.get("id", "#"),
587 if "id" in job_order:
590 # Need to filter this out, gets added by cwltool when providing
591 # parameters on the command line.
592 if "job_order" in job_order:
593 del job_order["job_order"]
597 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
599 def upload_workflow_deps(arvrunner, tool):
600 # Ensure that Docker images needed by this workflow are available
602 upload_docker(arvrunner, tool)
604 document_loader = tool.doc_loader
608 def upload_tool_deps(deptool):
610 discovered_secondaryfiles = {}
611 pm = upload_dependencies(arvrunner,
612 "%s dependencies" % (shortname(deptool["id"])),
617 include_primary=False,
618 discovered_secondaryfiles=discovered_secondaryfiles)
619 document_loader.idx[deptool["id"]] = deptool
621 for k,v in pm.items():
622 toolmap[k] = v.resolved
623 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
625 tool.visit(upload_tool_deps)
629 def arvados_jobs_image(arvrunner, img):
630 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
633 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
634 arvrunner.runtimeContext.force_docker_pull,
635 arvrunner.runtimeContext.tmp_outdir_prefix,
636 arvrunner.runtimeContext.match_local_docker)
637 except Exception as e:
638 raise Exception("Docker image %s is not available\n%s" % (img, e) )
641 def upload_workflow_collection(arvrunner, name, packed):
642 collection = arvados.collection.Collection(api_client=arvrunner.api,
643 keep_client=arvrunner.keep_client,
644 num_retries=arvrunner.num_retries)
645 with collection.open("workflow.cwl", "w") as f:
646 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
648 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
649 ["name", "like", name+"%"]]
650 if arvrunner.project_uuid:
651 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
652 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
655 logger.info("Using collection %s", exists["items"][0]["uuid"])
657 collection.save_new(name=name,
658 owner_uuid=arvrunner.project_uuid,
659 ensure_unique_name=True,
660 num_retries=arvrunner.num_retries)
661 logger.info("Uploaded to %s", collection.manifest_locator())
663 return collection.portable_data_hash()
666 class Runner(Process):
667 """Base class for runner processes, which submit an instance of
668 arvados-cwl-runner and wait for the final result."""
670 def __init__(self, runner, updated_tool,
671 tool, loadingContext, enable_reuse,
672 output_name, output_tags, submit_runner_ram=0,
673 name=None, on_error=None, submit_runner_image=None,
674 intermediate_output_ttl=0, merged_map=None,
675 priority=None, secret_store=None,
676 collection_cache_size=256,
677 collection_cache_is_default=True):
679 loadingContext = loadingContext.copy()
680 loadingContext.metadata = updated_tool.metadata.copy()
682 super(Runner, self).__init__(updated_tool.tool, loadingContext)
684 self.arvrunner = runner
685 self.embedded_tool = tool
686 self.job_order = None
689 # If reuse is permitted by command line arguments but
690 # disabled by the workflow itself, disable it.
691 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
693 enable_reuse = reuse_req["enableReuse"]
694 self.enable_reuse = enable_reuse
696 self.final_output = None
697 self.output_name = output_name
698 self.output_tags = output_tags
700 self.on_error = on_error
701 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
702 self.intermediate_output_ttl = intermediate_output_ttl
703 self.priority = priority
704 self.secret_store = secret_store
705 self.enable_dev = loadingContext.enable_dev
707 self.submit_runner_cores = 1
708 self.submit_runner_ram = 1024 # defaut 1 GiB
709 self.collection_cache_size = collection_cache_size
711 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
712 if runner_resource_req:
713 if runner_resource_req.get("coresMin"):
714 self.submit_runner_cores = runner_resource_req["coresMin"]
715 if runner_resource_req.get("ramMin"):
716 self.submit_runner_ram = runner_resource_req["ramMin"]
717 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
718 self.collection_cache_size = runner_resource_req["keep_cache"]
720 if submit_runner_ram:
721 # Command line / initializer overrides default and/or spec from workflow
722 self.submit_runner_ram = submit_runner_ram
724 if self.submit_runner_ram <= 0:
725 raise Exception("Value of submit-runner-ram must be greater than zero")
727 if self.submit_runner_cores <= 0:
728 raise Exception("Value of submit-runner-cores must be greater than zero")
730 self.merged_map = merged_map or {}
733 job_order, # type: Mapping[Text, Text]
734 output_callbacks, # type: Callable[[Any, Any], Any]
735 runtimeContext # type: RuntimeContext
736 ): # type: (...) -> Generator[Any, None, None]
737 self.job_order = job_order
738 self._init_job(job_order, runtimeContext)
741 def update_pipeline_component(self, record):
744 def done(self, record):
745 """Base method for handling a completed runner."""
748 if record["state"] == "Complete":
749 if record.get("exit_code") is not None:
750 if record["exit_code"] == 33:
751 processStatus = "UnsupportedRequirement"
752 elif record["exit_code"] == 0:
753 processStatus = "success"
755 processStatus = "permanentFail"
757 processStatus = "success"
759 processStatus = "permanentFail"
763 if processStatus == "permanentFail":
764 logc = arvados.collection.CollectionReader(record["log"],
765 api_client=self.arvrunner.api,
766 keep_client=self.arvrunner.keep_client,
767 num_retries=self.arvrunner.num_retries)
768 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
770 self.final_output = record["output"]
771 outc = arvados.collection.CollectionReader(self.final_output,
772 api_client=self.arvrunner.api,
773 keep_client=self.arvrunner.keep_client,
774 num_retries=self.arvrunner.num_retries)
775 if "cwl.output.json" in outc:
776 with outc.open("cwl.output.json", "rb") as f:
778 outputs = json.loads(f.read().decode())
779 def keepify(fileobj):
780 path = fileobj["location"]
781 if not path.startswith("keep:"):
782 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
783 adjustFileObjs(outputs, keepify)
784 adjustDirObjs(outputs, keepify)
786 logger.exception("[%s] While getting final output object", self.name)
787 self.arvrunner.output_callback({}, "permanentFail")
789 self.arvrunner.output_callback(outputs, processStatus)