1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109 container_engine="docker"
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if isinstance(inputschema, basestring):
132 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
138 if "secondaryFiles" in inputschema:
139 # set secondaryFiles, may be inherited by compound types.
140 secondaryspec = inputschema["secondaryFiles"]
142 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143 not isinstance(inputschema["type"], basestring)):
144 # compound type (union, array, record)
145 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147 elif (inputschema["type"] == "record" and
148 isinstance(primary, Mapping)):
150 # record type, find secondary files associated with fields.
152 for f in inputschema["fields"]:
153 p = primary.get(shortname(f["name"]))
155 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157 elif (inputschema["type"] == "array" and
158 isinstance(primary, Sequence)):
160 # array type, find secondary files of elements
163 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165 elif (inputschema["type"] == "File" and
167 isinstance(primary, Mapping) and
168 primary.get("class") == "File" and
169 "secondaryFiles" not in primary):
171 # Found a file, check for secondaryFiles
174 primary["secondaryFiles"] = secondaryspec
175 for i, sf in enumerate(aslist(secondaryspec)):
176 if builder.cwlVersion == "v1.0":
177 pattern = builder.do_eval(sf, context=primary)
179 pattern = builder.do_eval(sf["pattern"], context=primary)
182 if isinstance(pattern, list):
183 specs.extend(pattern)
184 elif isinstance(pattern, dict):
185 specs.append(pattern)
186 elif isinstance(pattern, str):
187 if builder.cwlVersion == "v1.0":
188 specs.append({"pattern": pattern, "required": True})
190 specs.append({"pattern": pattern, "required": sf.get("required")})
192 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
193 "Expression must return list, object, string or null")
196 for i, sf in enumerate(specs):
197 if isinstance(sf, dict):
198 if sf.get("class") == "File":
200 if sf.get("location") is None:
201 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202 "File object is missing 'location': %s" % sf)
203 sfpath = sf["location"]
206 pattern = sf["pattern"]
207 required = sf.get("required")
208 elif isinstance(sf, str):
212 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213 "Expression must return list, object, string or null")
215 if pattern is not None:
216 sfpath = substitute(primary["location"], pattern)
218 required = builder.do_eval(required, context=primary)
220 if fsaccess.exists(sfpath):
221 if pattern is not None:
222 found.append({"location": sfpath, "class": "File"})
226 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
227 "Required secondary file '%s' does not exist" % sfpath)
229 primary["secondaryFiles"] = cmap(found)
230 if discovered is not None:
231 discovered[primary["location"]] = primary["secondaryFiles"]
232 elif inputschema["type"] not in primitive_types_set:
233 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
235 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
236 for inputschema in inputs:
237 primary = job_order.get(shortname(inputschema["id"]))
238 if isinstance(primary, (Mapping, Sequence)):
239 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
241 def upload_dependencies(arvrunner, name, document_loader,
242 workflowobj, uri, loadref_run,
243 include_primary=True, discovered_secondaryfiles=None):
244 """Upload the dependencies of the workflowobj document to Keep.
246 Returns a pathmapper object mapping local paths to keep references. Also
247 does an in-place update of references in "workflowobj".
249 Use scandeps to find $import, $include, $schemas, run, File and Directory
250 fields that represent external references.
252 If workflowobj has an "id" field, this will reload the document to ensure
253 it is scanning the raw document prior to preprocessing.
258 joined = document_loader.fetcher.urljoin(b, u)
259 defrg, _ = urllib.parse.urldefrag(joined)
260 if defrg not in loaded:
262 # Use fetch_text to get raw file (before preprocessing).
263 text = document_loader.fetch_text(defrg)
264 if isinstance(text, bytes):
265 textIO = StringIO(text.decode('utf-8'))
267 textIO = StringIO(text)
268 yamlloader = YAML(typ='safe', pure=True)
269 return yamlloader.load(textIO)
274 loadref_fields = set(("$import", "run"))
276 loadref_fields = set(("$import",))
278 scanobj = workflowobj
279 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
280 # Need raw file content (before preprocessing) to ensure
281 # that external references in $include and $mixin are captured.
282 scanobj = loadref("", workflowobj["id"])
286 sc_result = scandeps(uri, scanobj,
288 set(("$include", "location")),
289 loadref, urljoin=document_loader.fetcher.urljoin,
292 optional_deps = scandeps(uri, scanobj,
295 loadref, urljoin=document_loader.fetcher.urljoin,
298 sc_result.extend(optional_deps)
303 def collect_uuids(obj):
304 loc = obj.get("location", "")
307 # Collect collection uuids that need to be resolved to
308 # portable data hashes
309 gp = collection_uuid_pattern.match(loc)
311 uuids[gp.groups()[0]] = obj
312 if collectionUUID in obj:
313 uuids[obj[collectionUUID]] = obj
315 def collect_uploads(obj):
316 loc = obj.get("location", "")
320 if sp[0] in ("file", "http", "https"):
321 # Record local files than need to be uploaded,
322 # don't include file literals, keep references, etc.
326 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
327 visit_class(sc_result, ("File", "Directory"), collect_uploads)
329 # Resolve any collection uuids we found to portable data hashes
330 # and assign them to uuid_map
332 fetch_uuids = list(uuids.keys())
334 # For a large number of fetch_uuids, API server may limit
335 # response size, so keep fetching from API server has nothing
337 lookups = arvrunner.api.collections().list(
338 filters=[["uuid", "in", fetch_uuids]],
340 select=["uuid", "portable_data_hash"]).execute(
341 num_retries=arvrunner.num_retries)
343 if not lookups["items"]:
346 for l in lookups["items"]:
347 uuid_map[l["uuid"]] = l["portable_data_hash"]
349 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
351 normalizeFilesDirs(sc)
353 if include_primary and "id" in workflowobj:
354 sc.append({"class": "File", "location": workflowobj["id"]})
356 #if "$schemas" in workflowobj:
357 # for s in workflowobj["$schemas"]:
358 # sc.append({"class": "File", "location": s})
360 def visit_default(obj):
362 def ensure_default_location(f):
363 if "location" not in f and "path" in f:
364 f["location"] = f["path"]
366 optional_deps.append(f)
367 #if "location" in f and not arvrunner.fs_access.exists(f["location"]):
368 # # Doesn't exist, remove from list of dependencies to upload
369 # sc[:] = [x for x in sc if x["location"] != f["location"]]
370 # # Delete "default" from workflowobj
372 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
376 find_defaults(workflowobj, visit_default)
379 def discover_default_secondary_files(obj):
380 builder_job_order = {}
381 for t in obj["inputs"]:
382 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
383 # Need to create a builder object to evaluate expressions.
384 builder = make_builder(builder_job_order,
385 obj.get("hints", []),
386 obj.get("requirements", []),
389 discover_secondary_files(arvrunner.fs_access,
395 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
396 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
398 for d in list(discovered):
399 # Only interested in discovered secondaryFiles which are local
400 # files that need to be uploaded.
401 if d.startswith("file:"):
402 sc.extend(discovered[d])
408 mapper = ArvPathMapper(arvrunner, sc, "",
412 single_collection=True,
413 optional_deps=optional_deps)
415 print("whargh", mapper._pathmap)
418 loc = p.get("location")
419 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
420 p["location"] = mapper.mapper(p["location"]).resolved
426 if collectionUUID in p:
427 uuid = p[collectionUUID]
428 if uuid not in uuid_map:
429 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
430 "Collection uuid %s not found" % uuid)
431 gp = collection_pdh_pattern.match(loc)
432 if gp and uuid_map[uuid] != gp.groups()[0]:
433 # This file entry has both collectionUUID and a PDH
434 # location. If the PDH doesn't match the one returned
435 # the API server, raise an error.
436 raise SourceLine(p, "location", validate.ValidationException).makeError(
437 "Expected collection uuid %s to be %s but API server reported %s" % (
438 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
440 gp = collection_uuid_pattern.match(loc)
443 uuid = gp.groups()[0]
444 if uuid not in uuid_map:
445 raise SourceLine(p, "location", validate.ValidationException).makeError(
446 "Collection uuid %s not found" % uuid)
447 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
448 p[collectionUUID] = uuid
450 visit_class(workflowobj, ("File", "Directory"), setloc)
451 visit_class(discovered, ("File", "Directory"), setloc)
453 if discovered_secondaryfiles is not None:
455 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
457 if "$schemas" in workflowobj:
459 for s in workflowobj["$schemas"]:
461 sch.append(mapper.mapper(s).resolved)
462 workflowobj["$schemas"] = sch
467 def upload_docker(arvrunner, tool):
468 """Uploads Docker images used in CommandLineTool objects."""
470 if isinstance(tool, CommandLineTool):
471 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
473 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
474 # TODO: can be supported by containers API, but not jobs API.
475 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
476 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
477 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
478 arvrunner.runtimeContext.force_docker_pull,
479 arvrunner.runtimeContext.tmp_outdir_prefix,
480 arvrunner.runtimeContext.match_local_docker)
482 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
483 True, arvrunner.project_uuid,
484 arvrunner.runtimeContext.force_docker_pull,
485 arvrunner.runtimeContext.tmp_outdir_prefix,
486 arvrunner.runtimeContext.match_local_docker)
487 elif isinstance(tool, cwltool.workflow.Workflow):
489 upload_docker(arvrunner, s.embedded_tool)
492 def packed_workflow(arvrunner, tool, merged_map):
493 """Create a packed workflow.
495 A "packed" workflow is one where all the components have been combined into a single document."""
498 packed = pack(arvrunner.loadingContext, tool.tool["id"],
499 rewrite_out=rewrites,
500 loader=tool.doc_loader)
502 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
504 def visit(v, cur_id):
505 if isinstance(v, dict):
506 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
507 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
508 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
510 cur_id = rewrite_to_orig.get(v["id"], v["id"])
511 if "path" in v and "location" not in v:
512 v["location"] = v["path"]
514 if "location" in v and cur_id in merged_map:
515 if v["location"] in merged_map[cur_id].resolved:
516 v["location"] = merged_map[cur_id].resolved[v["location"]]
517 if v["location"] in merged_map[cur_id].secondaryFiles:
518 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
519 if v.get("class") == "DockerRequirement":
520 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
521 arvrunner.project_uuid,
522 arvrunner.runtimeContext.force_docker_pull,
523 arvrunner.runtimeContext.tmp_outdir_prefix,
524 arvrunner.runtimeContext.match_local_docker)
527 if isinstance(v, list):
534 def tag_git_version(packed):
535 if tool.tool["id"].startswith("file://"):
536 path = os.path.dirname(tool.tool["id"][7:])
538 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
539 except (OSError, subprocess.CalledProcessError):
542 packed["http://schema.org/version"] = githash
545 def upload_job_order(arvrunner, name, tool, job_order):
546 """Upload local files referenced in the input object and return updated input
547 object with 'location' updated to the proper keep references.
550 # Make a copy of the job order and set defaults.
551 builder_job_order = copy.copy(job_order)
553 # fill_in_defaults throws an error if there are any
554 # missing required parameters, we don't want it to do that
555 # so make them all optional.
556 inputs_copy = copy.deepcopy(tool.tool["inputs"])
557 for i in inputs_copy:
558 if "null" not in i["type"]:
559 i["type"] = ["null"] + aslist(i["type"])
561 fill_in_defaults(inputs_copy,
564 # Need to create a builder object to evaluate expressions.
565 builder = make_builder(builder_job_order,
570 # Now update job_order with secondaryFiles
571 discover_secondary_files(arvrunner.fs_access,
576 jobmapper = upload_dependencies(arvrunner,
580 job_order.get("id", "#"),
583 if "id" in job_order:
586 # Need to filter this out, gets added by cwltool when providing
587 # parameters on the command line.
588 if "job_order" in job_order:
589 del job_order["job_order"]
593 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
595 def upload_workflow_deps(arvrunner, tool):
596 # Ensure that Docker images needed by this workflow are available
598 upload_docker(arvrunner, tool)
600 document_loader = tool.doc_loader
604 def upload_tool_deps(deptool):
606 discovered_secondaryfiles = {}
607 pm = upload_dependencies(arvrunner,
608 "%s dependencies" % (shortname(deptool["id"])),
613 include_primary=False,
614 discovered_secondaryfiles=discovered_secondaryfiles)
615 document_loader.idx[deptool["id"]] = deptool
617 for k,v in pm.items():
618 toolmap[k] = v.resolved
619 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
621 tool.visit(upload_tool_deps)
625 def arvados_jobs_image(arvrunner, img):
626 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
629 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
630 arvrunner.runtimeContext.force_docker_pull,
631 arvrunner.runtimeContext.tmp_outdir_prefix,
632 arvrunner.runtimeContext.match_local_docker)
633 except Exception as e:
634 raise Exception("Docker image %s is not available\n%s" % (img, e) )
637 def upload_workflow_collection(arvrunner, name, packed):
638 collection = arvados.collection.Collection(api_client=arvrunner.api,
639 keep_client=arvrunner.keep_client,
640 num_retries=arvrunner.num_retries)
641 with collection.open("workflow.cwl", "w") as f:
642 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
644 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
645 ["name", "like", name+"%"]]
646 if arvrunner.project_uuid:
647 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
648 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
651 logger.info("Using collection %s", exists["items"][0]["uuid"])
653 collection.save_new(name=name,
654 owner_uuid=arvrunner.project_uuid,
655 ensure_unique_name=True,
656 num_retries=arvrunner.num_retries)
657 logger.info("Uploaded to %s", collection.manifest_locator())
659 return collection.portable_data_hash()
662 class Runner(Process):
663 """Base class for runner processes, which submit an instance of
664 arvados-cwl-runner and wait for the final result."""
666 def __init__(self, runner, updated_tool,
667 tool, loadingContext, enable_reuse,
668 output_name, output_tags, submit_runner_ram=0,
669 name=None, on_error=None, submit_runner_image=None,
670 intermediate_output_ttl=0, merged_map=None,
671 priority=None, secret_store=None,
672 collection_cache_size=256,
673 collection_cache_is_default=True):
675 loadingContext = loadingContext.copy()
676 loadingContext.metadata = updated_tool.metadata.copy()
678 super(Runner, self).__init__(updated_tool.tool, loadingContext)
680 self.arvrunner = runner
681 self.embedded_tool = tool
682 self.job_order = None
685 # If reuse is permitted by command line arguments but
686 # disabled by the workflow itself, disable it.
687 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
689 enable_reuse = reuse_req["enableReuse"]
690 self.enable_reuse = enable_reuse
692 self.final_output = None
693 self.output_name = output_name
694 self.output_tags = output_tags
696 self.on_error = on_error
697 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
698 self.intermediate_output_ttl = intermediate_output_ttl
699 self.priority = priority
700 self.secret_store = secret_store
701 self.enable_dev = loadingContext.enable_dev
703 self.submit_runner_cores = 1
704 self.submit_runner_ram = 1024 # defaut 1 GiB
705 self.collection_cache_size = collection_cache_size
707 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
708 if runner_resource_req:
709 if runner_resource_req.get("coresMin"):
710 self.submit_runner_cores = runner_resource_req["coresMin"]
711 if runner_resource_req.get("ramMin"):
712 self.submit_runner_ram = runner_resource_req["ramMin"]
713 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
714 self.collection_cache_size = runner_resource_req["keep_cache"]
716 if submit_runner_ram:
717 # Command line / initializer overrides default and/or spec from workflow
718 self.submit_runner_ram = submit_runner_ram
720 if self.submit_runner_ram <= 0:
721 raise Exception("Value of submit-runner-ram must be greater than zero")
723 if self.submit_runner_cores <= 0:
724 raise Exception("Value of submit-runner-cores must be greater than zero")
726 self.merged_map = merged_map or {}
729 job_order, # type: Mapping[Text, Text]
730 output_callbacks, # type: Callable[[Any, Any], Any]
731 runtimeContext # type: RuntimeContext
732 ): # type: (...) -> Generator[Any, None, None]
733 self.job_order = job_order
734 self._init_job(job_order, runtimeContext)
737 def update_pipeline_component(self, record):
740 def done(self, record):
741 """Base method for handling a completed runner."""
744 if record["state"] == "Complete":
745 if record.get("exit_code") is not None:
746 if record["exit_code"] == 33:
747 processStatus = "UnsupportedRequirement"
748 elif record["exit_code"] == 0:
749 processStatus = "success"
751 processStatus = "permanentFail"
753 processStatus = "success"
755 processStatus = "permanentFail"
759 if processStatus == "permanentFail":
760 logc = arvados.collection.CollectionReader(record["log"],
761 api_client=self.arvrunner.api,
762 keep_client=self.arvrunner.keep_client,
763 num_retries=self.arvrunner.num_retries)
764 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
766 self.final_output = record["output"]
767 outc = arvados.collection.CollectionReader(self.final_output,
768 api_client=self.arvrunner.api,
769 keep_client=self.arvrunner.keep_client,
770 num_retries=self.arvrunner.num_retries)
771 if "cwl.output.json" in outc:
772 with outc.open("cwl.output.json", "rb") as f:
774 outputs = json.loads(f.read().decode())
775 def keepify(fileobj):
776 path = fileobj["location"]
777 if not path.startswith("keep:"):
778 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
779 adjustFileObjs(outputs, keepify)
780 adjustDirObjs(outputs, keepify)
782 logger.exception("[%s] While getting final output object", self.name)
783 self.arvrunner.output_callback({}, "permanentFail")
785 self.arvrunner.output_callback(outputs, processStatus)