1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
52 from .perf import Perf
54 logger = logging.getLogger('arvados.cwl-runner')
55 metrics = logging.getLogger('arvados.cwl-runner.metrics')
57 def trim_anonymous_location(obj):
58 """Remove 'location' field from File and Directory literals.
60 To make internal handling easier, literals are assigned a random id for
61 'location'. However, when writing the record back out, this can break
62 reproducibility. Since it is valid for literals not have a 'location'
67 if obj.get("location", "").startswith("_:"):
71 def remove_redundant_fields(obj):
72 for field in ("path", "nameext", "nameroot", "dirname"):
77 def find_defaults(d, op):
78 if isinstance(d, list):
81 elif isinstance(d, dict):
85 for i in viewvalues(d):
88 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
91 files=[], # type: List[Dict[Text, Text]]
92 bindings=[], # type: List[Dict[Text, Any]]
93 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
94 names=None, # type: Names
95 requirements=requirements, # type: List[Dict[Text, Any]]
96 hints=hints, # type: List[Dict[Text, Any]]
97 resources={}, # type: Dict[str, int]
98 mutation_manager=None, # type: Optional[MutationManager]
99 formatgraph=None, # type: Optional[Graph]
100 make_fs_access=None, # type: Type[StdFsAccess]
101 fs_access=None, # type: StdFsAccess
102 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
103 timeout=runtimeContext.eval_timeout, # type: float
104 debug=runtimeContext.debug, # type: bool
105 js_console=runtimeContext.js_console, # type: bool
106 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
107 loadListing="", # type: Text
108 outdir="", # type: Text
109 tmpdir="", # type: Text
110 stagedir="", # type: Text
111 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
112 container_engine="docker"
115 def search_schemadef(name, reqs):
117 if r["class"] == "SchemaDefRequirement":
118 for sd in r["types"]:
119 if sd["name"] == name:
123 primitive_types_set = frozenset(("null", "boolean", "int", "long",
124 "float", "double", "string", "record",
127 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
128 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
129 # union type, collect all possible secondaryFiles
130 for i in inputschema:
131 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
134 if inputschema == "File":
135 inputschema = {"type": "File"}
137 if isinstance(inputschema, basestring):
138 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
144 if "secondaryFiles" in inputschema:
145 # set secondaryFiles, may be inherited by compound types.
146 secondaryspec = inputschema["secondaryFiles"]
148 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
149 not isinstance(inputschema["type"], basestring)):
150 # compound type (union, array, record)
151 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
153 elif (inputschema["type"] == "record" and
154 isinstance(primary, Mapping)):
156 # record type, find secondary files associated with fields.
158 for f in inputschema["fields"]:
159 p = primary.get(shortname(f["name"]))
161 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
163 elif (inputschema["type"] == "array" and
164 isinstance(primary, Sequence)):
166 # array type, find secondary files of elements
169 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
171 elif (inputschema["type"] == "File" and
172 isinstance(primary, Mapping) and
173 primary.get("class") == "File"):
175 if "secondaryFiles" in primary or not secondaryspec:
180 # Found a file, check for secondaryFiles
183 primary["secondaryFiles"] = secondaryspec
184 for i, sf in enumerate(aslist(secondaryspec)):
185 if builder.cwlVersion == "v1.0":
188 pattern = sf["pattern"]
191 if isinstance(pattern, list):
192 specs.extend(pattern)
193 elif isinstance(pattern, dict):
194 specs.append(pattern)
195 elif isinstance(pattern, str):
196 if builder.cwlVersion == "v1.0":
197 specs.append({"pattern": pattern, "required": True})
199 specs.append({"pattern": pattern, "required": sf.get("required")})
201 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202 "Expression must return list, object, string or null")
205 for i, sf in enumerate(specs):
206 if isinstance(sf, dict):
207 if sf.get("class") == "File":
209 if sf.get("location") is None:
210 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
211 "File object is missing 'location': %s" % sf)
212 sfpath = sf["location"]
215 pattern = sf["pattern"]
216 required = sf.get("required")
217 elif isinstance(sf, str):
221 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
222 "Expression must return list, object, string or null")
224 if pattern is not None:
225 if "${" in pattern or "$(" in pattern:
226 sfname = builder.do_eval(pattern, context=primary)
228 sfname = substitute(primary["basename"], pattern)
233 p_location = primary["location"]
234 if "/" in p_location:
236 p_location[0 : p_location.rindex("/") + 1]
240 required = builder.do_eval(required, context=primary)
242 if fsaccess.exists(sfpath):
243 if pattern is not None:
244 found.append({"location": sfpath, "class": "File"})
248 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
249 "Required secondary file '%s' does not exist" % sfpath)
251 primary["secondaryFiles"] = cmap(found)
252 if discovered is not None:
253 discovered[primary["location"]] = primary["secondaryFiles"]
254 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
255 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
257 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
258 for inputschema in inputs:
259 primary = job_order.get(shortname(inputschema["id"]))
260 if isinstance(primary, (Mapping, Sequence)):
261 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
263 def upload_dependencies(arvrunner, name, document_loader,
264 workflowobj, uri, loadref_run, runtimeContext,
265 include_primary=True, discovered_secondaryfiles=None,
267 """Upload the dependencies of the workflowobj document to Keep.
269 Returns a pathmapper object mapping local paths to keep references. Also
270 does an in-place update of references in "workflowobj".
272 Use scandeps to find $import, $include, $schemas, run, File and Directory
273 fields that represent external references.
275 If workflowobj has an "id" field, this will reload the document to ensure
276 it is scanning the raw document prior to preprocessing.
281 joined = document_loader.fetcher.urljoin(b, u)
282 defrg, _ = urllib.parse.urldefrag(joined)
283 if defrg not in loaded:
285 if cache is not None and defrg in cache:
287 # Use fetch_text to get raw file (before preprocessing).
288 text = document_loader.fetch_text(defrg)
289 if isinstance(text, bytes):
290 textIO = StringIO(text.decode('utf-8'))
292 textIO = StringIO(text)
293 yamlloader = YAML(typ='safe', pure=True)
294 result = yamlloader.load(textIO)
295 if cache is not None:
296 cache[defrg] = result
302 loadref_fields = set(("$import", "run"))
304 loadref_fields = set(("$import",))
306 scanobj = workflowobj
307 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
308 # Need raw file content (before preprocessing) to ensure
309 # that external references in $include and $mixin are captured.
310 scanobj = loadref("", workflowobj["id"])
314 with Perf(metrics, "scandeps include, location"):
315 sc_result = scandeps(uri, scanobj,
317 set(("$include", "location")),
318 loadref, urljoin=document_loader.fetcher.urljoin,
321 with Perf(metrics, "scandeps $schemas"):
322 optional_deps = scandeps(uri, scanobj,
325 loadref, urljoin=document_loader.fetcher.urljoin,
328 sc_result.extend(optional_deps)
333 def collect_uuids(obj):
334 loc = obj.get("location", "")
337 # Collect collection uuids that need to be resolved to
338 # portable data hashes
339 gp = collection_uuid_pattern.match(loc)
341 uuids[gp.groups()[0]] = obj
342 if collectionUUID in obj:
343 uuids[obj[collectionUUID]] = obj
345 def collect_uploads(obj):
346 loc = obj.get("location", "")
350 if sp[0] in ("file", "http", "https"):
351 # Record local files than need to be uploaded,
352 # don't include file literals, keep references, etc.
356 with Perf(metrics, "collect uuids"):
357 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
359 with Perf(metrics, "collect uploads"):
360 visit_class(sc_result, ("File", "Directory"), collect_uploads)
362 # Resolve any collection uuids we found to portable data hashes
363 # and assign them to uuid_map
365 fetch_uuids = list(uuids.keys())
366 with Perf(metrics, "fetch_uuids"):
368 # For a large number of fetch_uuids, API server may limit
369 # response size, so keep fetching from API server has nothing
371 lookups = arvrunner.api.collections().list(
372 filters=[["uuid", "in", fetch_uuids]],
374 select=["uuid", "portable_data_hash"]).execute(
375 num_retries=arvrunner.num_retries)
377 if not lookups["items"]:
380 for l in lookups["items"]:
381 uuid_map[l["uuid"]] = l["portable_data_hash"]
383 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
385 normalizeFilesDirs(sc)
387 if include_primary and "id" in workflowobj:
388 sc.append({"class": "File", "location": workflowobj["id"]})
390 def visit_default(obj):
391 def defaults_are_optional(f):
392 if "location" not in f and "path" in f:
393 f["location"] = f["path"]
395 normalizeFilesDirs(f)
396 optional_deps.append(f)
397 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
399 find_defaults(workflowobj, visit_default)
402 def discover_default_secondary_files(obj):
403 builder_job_order = {}
404 for t in obj["inputs"]:
405 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
406 # Need to create a builder object to evaluate expressions.
407 builder = make_builder(builder_job_order,
408 obj.get("hints", []),
409 obj.get("requirements", []),
412 discover_secondary_files(arvrunner.fs_access,
418 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
419 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
421 for d in list(discovered):
422 # Only interested in discovered secondaryFiles which are local
423 # files that need to be uploaded.
424 if d.startswith("file:"):
425 sc.extend(discovered[d])
429 with Perf(metrics, "mapper"):
430 mapper = ArvPathMapper(arvrunner, sc, "",
434 single_collection=True,
435 optional_deps=optional_deps)
439 if k.startswith("keep:"):
440 keeprefs.add(collection_pdh_pattern.match(k).group(1))
443 loc = p.get("location")
444 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
445 p["location"] = mapper.mapper(p["location"]).resolved
446 addkeepref(p["location"])
452 if collectionUUID in p:
453 uuid = p[collectionUUID]
454 if uuid not in uuid_map:
455 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
456 "Collection uuid %s not found" % uuid)
457 gp = collection_pdh_pattern.match(loc)
458 if gp and uuid_map[uuid] != gp.groups()[0]:
459 # This file entry has both collectionUUID and a PDH
460 # location. If the PDH doesn't match the one returned
461 # the API server, raise an error.
462 raise SourceLine(p, "location", validate.ValidationException).makeError(
463 "Expected collection uuid %s to be %s but API server reported %s" % (
464 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
466 gp = collection_uuid_pattern.match(loc)
468 # Not a uuid pattern (must be a pdh pattern)
469 addkeepref(p["location"])
472 uuid = gp.groups()[0]
473 if uuid not in uuid_map:
474 raise SourceLine(p, "location", validate.ValidationException).makeError(
475 "Collection uuid %s not found" % uuid)
476 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
477 p[collectionUUID] = uuid
479 with Perf(metrics, "setloc"):
480 visit_class(workflowobj, ("File", "Directory"), setloc)
481 visit_class(discovered, ("File", "Directory"), setloc)
483 if discovered_secondaryfiles is not None:
485 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
487 if runtimeContext.copy_deps:
488 # Find referenced collections and copy them into the
489 # destination project, for easy sharing.
490 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
491 filters=[["portable_data_hash", "in", list(keeprefs)],
492 ["owner_uuid", "=", runtimeContext.project_uuid]],
493 select=["uuid", "portable_data_hash", "created_at"]))
495 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
497 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
498 order="created_at desc",
499 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
501 if len(col["items"]) == 0:
502 logger.warning("Cannot find collection with portable data hash %s", kr)
504 col = col["items"][0]
506 arvrunner.api.collections().create(body={"collection": {
507 "owner_uuid": runtimeContext.project_uuid,
509 "description": col["description"],
510 "properties": col["properties"],
511 "portable_data_hash": col["portable_data_hash"],
512 "manifest_text": col["manifest_text"],
513 "storage_classes_desired": col["storage_classes_desired"],
514 "trash_at": col["trash_at"]
515 }}, ensure_unique_name=True).execute()
516 except Exception as e:
517 logger.warning("Unable copy collection to destination: %s", e)
519 if "$schemas" in workflowobj:
521 for s in workflowobj["$schemas"]:
523 sch.append(mapper.mapper(s).resolved)
524 workflowobj["$schemas"] = sch
529 def upload_docker(arvrunner, tool, runtimeContext):
530 """Uploads Docker images used in CommandLineTool objects."""
532 if isinstance(tool, CommandLineTool):
533 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
535 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
536 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
537 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
539 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
540 runtimeContext.project_uuid,
541 runtimeContext.force_docker_pull,
542 runtimeContext.tmp_outdir_prefix,
543 runtimeContext.match_local_docker,
544 runtimeContext.copy_deps)
546 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
548 runtimeContext.project_uuid,
549 runtimeContext.force_docker_pull,
550 runtimeContext.tmp_outdir_prefix,
551 runtimeContext.match_local_docker,
552 runtimeContext.copy_deps)
553 elif isinstance(tool, cwltool.workflow.Workflow):
555 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
558 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
559 """Create a packed workflow.
561 A "packed" workflow is one where all the components have been combined into a single document."""
564 packed = pack(arvrunner.loadingContext, tool.tool["id"],
565 rewrite_out=rewrites,
566 loader=tool.doc_loader)
568 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
570 def visit(v, cur_id):
571 if isinstance(v, dict):
572 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
573 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
574 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
576 cur_id = rewrite_to_orig.get(v["id"], v["id"])
577 if "path" in v and "location" not in v:
578 v["location"] = v["path"]
580 if "location" in v and cur_id in merged_map:
581 if v["location"] in merged_map[cur_id].resolved:
582 v["location"] = merged_map[cur_id].resolved[v["location"]]
583 if v["location"] in merged_map[cur_id].secondaryFiles:
584 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
585 if v.get("class") == "DockerRequirement":
586 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
587 runtimeContext.project_uuid,
588 runtimeContext.force_docker_pull,
589 runtimeContext.tmp_outdir_prefix,
590 runtimeContext.match_local_docker,
591 runtimeContext.copy_deps)
594 if isinstance(v, list):
601 def tag_git_version(packed):
602 if tool.tool["id"].startswith("file://"):
603 path = os.path.dirname(tool.tool["id"][7:])
605 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
606 except (OSError, subprocess.CalledProcessError):
609 packed["http://schema.org/version"] = githash
612 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
613 """Upload local files referenced in the input object and return updated input
614 object with 'location' updated to the proper keep references.
617 # Make a copy of the job order and set defaults.
618 builder_job_order = copy.copy(job_order)
620 # fill_in_defaults throws an error if there are any
621 # missing required parameters, we don't want it to do that
622 # so make them all optional.
623 inputs_copy = copy.deepcopy(tool.tool["inputs"])
624 for i in inputs_copy:
625 if "null" not in i["type"]:
626 i["type"] = ["null"] + aslist(i["type"])
628 fill_in_defaults(inputs_copy,
631 # Need to create a builder object to evaluate expressions.
632 builder = make_builder(builder_job_order,
637 # Now update job_order with secondaryFiles
638 discover_secondary_files(arvrunner.fs_access,
643 jobmapper = upload_dependencies(arvrunner,
647 job_order.get("id", "#"),
651 if "id" in job_order:
654 # Need to filter this out, gets added by cwltool when providing
655 # parameters on the command line.
656 if "job_order" in job_order:
657 del job_order["job_order"]
661 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
663 def upload_workflow_deps(arvrunner, tool, runtimeContext):
664 # Ensure that Docker images needed by this workflow are available
666 with Perf(metrics, "upload_docker"):
667 upload_docker(arvrunner, tool, runtimeContext)
669 document_loader = tool.doc_loader
673 def upload_tool_deps(deptool):
675 discovered_secondaryfiles = {}
676 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
677 pm = upload_dependencies(arvrunner,
678 "%s dependencies" % (shortname(deptool["id"])),
684 include_primary=False,
685 discovered_secondaryfiles=discovered_secondaryfiles,
686 cache=tool_dep_cache)
687 document_loader.idx[deptool["id"]] = deptool
689 for k,v in pm.items():
690 toolmap[k] = v.resolved
691 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
693 tool.visit(upload_tool_deps)
697 def arvados_jobs_image(arvrunner, img, runtimeContext):
698 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
701 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
703 runtimeContext.project_uuid,
704 runtimeContext.force_docker_pull,
705 runtimeContext.tmp_outdir_prefix,
706 runtimeContext.match_local_docker,
707 runtimeContext.copy_deps)
708 except Exception as e:
709 raise Exception("Docker image %s is not available\n%s" % (img, e) )
712 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
713 collection = arvados.collection.Collection(api_client=arvrunner.api,
714 keep_client=arvrunner.keep_client,
715 num_retries=arvrunner.num_retries)
716 with collection.open("workflow.cwl", "w") as f:
717 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
719 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
720 ["name", "like", name+"%"]]
721 if runtimeContext.project_uuid:
722 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
723 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
726 logger.info("Using collection %s", exists["items"][0]["uuid"])
728 collection.save_new(name=name,
729 owner_uuid=runtimeContext.project_uuid,
730 ensure_unique_name=True,
731 num_retries=arvrunner.num_retries)
732 logger.info("Uploaded to %s", collection.manifest_locator())
734 return collection.portable_data_hash()
737 class Runner(Process):
738 """Base class for runner processes, which submit an instance of
739 arvados-cwl-runner and wait for the final result."""
741 def __init__(self, runner, updated_tool,
742 tool, loadingContext, enable_reuse,
743 output_name, output_tags, submit_runner_ram=0,
744 name=None, on_error=None, submit_runner_image=None,
745 intermediate_output_ttl=0, merged_map=None,
746 priority=None, secret_store=None,
747 collection_cache_size=256,
748 collection_cache_is_default=True):
750 loadingContext = loadingContext.copy()
751 loadingContext.metadata = updated_tool.metadata.copy()
753 super(Runner, self).__init__(updated_tool.tool, loadingContext)
755 self.arvrunner = runner
756 self.embedded_tool = tool
757 self.job_order = None
760 # If reuse is permitted by command line arguments but
761 # disabled by the workflow itself, disable it.
762 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
764 enable_reuse = reuse_req["enableReuse"]
765 self.enable_reuse = enable_reuse
767 self.final_output = None
768 self.output_name = output_name
769 self.output_tags = output_tags
771 self.on_error = on_error
772 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
773 self.intermediate_output_ttl = intermediate_output_ttl
774 self.priority = priority
775 self.secret_store = secret_store
776 self.enable_dev = loadingContext.enable_dev
778 self.submit_runner_cores = 1
779 self.submit_runner_ram = 1024 # defaut 1 GiB
780 self.collection_cache_size = collection_cache_size
782 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
783 if runner_resource_req:
784 if runner_resource_req.get("coresMin"):
785 self.submit_runner_cores = runner_resource_req["coresMin"]
786 if runner_resource_req.get("ramMin"):
787 self.submit_runner_ram = runner_resource_req["ramMin"]
788 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
789 self.collection_cache_size = runner_resource_req["keep_cache"]
791 if submit_runner_ram:
792 # Command line / initializer overrides default and/or spec from workflow
793 self.submit_runner_ram = submit_runner_ram
795 if self.submit_runner_ram <= 0:
796 raise Exception("Value of submit-runner-ram must be greater than zero")
798 if self.submit_runner_cores <= 0:
799 raise Exception("Value of submit-runner-cores must be greater than zero")
801 self.merged_map = merged_map or {}
804 job_order, # type: Mapping[Text, Text]
805 output_callbacks, # type: Callable[[Any, Any], Any]
806 runtimeContext # type: RuntimeContext
807 ): # type: (...) -> Generator[Any, None, None]
808 self.job_order = job_order
809 self._init_job(job_order, runtimeContext)
812 def update_pipeline_component(self, record):
815 def done(self, record):
816 """Base method for handling a completed runner."""
819 if record["state"] == "Complete":
820 if record.get("exit_code") is not None:
821 if record["exit_code"] == 33:
822 processStatus = "UnsupportedRequirement"
823 elif record["exit_code"] == 0:
824 processStatus = "success"
826 processStatus = "permanentFail"
828 processStatus = "success"
830 processStatus = "permanentFail"
834 if processStatus == "permanentFail":
835 logc = arvados.collection.CollectionReader(record["log"],
836 api_client=self.arvrunner.api,
837 keep_client=self.arvrunner.keep_client,
838 num_retries=self.arvrunner.num_retries)
839 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
841 self.final_output = record["output"]
842 outc = arvados.collection.CollectionReader(self.final_output,
843 api_client=self.arvrunner.api,
844 keep_client=self.arvrunner.keep_client,
845 num_retries=self.arvrunner.num_retries)
846 if "cwl.output.json" in outc:
847 with outc.open("cwl.output.json", "rb") as f:
849 outputs = json.loads(f.read().decode())
850 def keepify(fileobj):
851 path = fileobj["location"]
852 if not path.startswith("keep:"):
853 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
854 adjustFileObjs(outputs, keepify)
855 adjustDirObjs(outputs, keepify)
857 logger.exception("[%s] While getting final output object", self.name)
858 self.arvrunner.output_callback({}, "permanentFail")
860 self.arvrunner.output_callback(outputs, processStatus)