1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
45 if os.name == "posix" and sys.version_info[0] < 3:
46 import subprocess32 as subprocess
50 from schema_salad.sourceline import SourceLine, cmap
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55 shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
65 import arvados.collection
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
73 from ._version import __version__
75 from . context import ArvRuntimeContext
76 from .perf import Perf
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
81 def trim_anonymous_location(obj):
82 """Remove 'location' field from File and Directory literals.
84 To make internal handling easier, literals are assigned a random id for
85 'location'. However, when writing the record back out, this can break
86 reproducibility. Since it is valid for literals not have a 'location'
91 if obj.get("location", "").startswith("_:"):
95 def remove_redundant_fields(obj):
96 for field in ("path", "nameext", "nameroot", "dirname"):
101 def find_defaults(d, op):
102 if isinstance(d, list):
105 elif isinstance(d, dict):
109 for i in viewvalues(d):
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
115 files=[], # type: List[Dict[Text, Text]]
116 bindings=[], # type: List[Dict[Text, Any]]
117 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
118 names=None, # type: Names
119 requirements=requirements, # type: List[Dict[Text, Any]]
120 hints=hints, # type: List[Dict[Text, Any]]
121 resources={}, # type: Dict[str, int]
122 mutation_manager=None, # type: Optional[MutationManager]
123 formatgraph=None, # type: Optional[Graph]
124 make_fs_access=None, # type: Type[StdFsAccess]
125 fs_access=None, # type: StdFsAccess
126 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127 timeout=runtimeContext.eval_timeout, # type: float
128 debug=runtimeContext.debug, # type: bool
129 js_console=runtimeContext.js_console, # type: bool
130 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
131 loadListing="", # type: Text
132 outdir="", # type: Text
133 tmpdir="", # type: Text
134 stagedir="", # type: Text
135 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136 container_engine="docker"
139 def search_schemadef(name, reqs):
141 if r["class"] == "SchemaDefRequirement":
142 for sd in r["types"]:
143 if sd["name"] == name:
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148 "float", "double", "string", "record",
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153 # union type, collect all possible secondaryFiles
154 for i in inputschema:
155 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
158 if inputschema == "File":
159 inputschema = {"type": "File"}
161 if isinstance(inputschema, basestring):
162 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
168 if "secondaryFiles" in inputschema:
169 # set secondaryFiles, may be inherited by compound types.
170 secondaryspec = inputschema["secondaryFiles"]
172 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173 not isinstance(inputschema["type"], basestring)):
174 # compound type (union, array, record)
175 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
177 elif (inputschema["type"] == "record" and
178 isinstance(primary, Mapping)):
180 # record type, find secondary files associated with fields.
182 for f in inputschema["fields"]:
183 p = primary.get(shortname(f["name"]))
185 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
187 elif (inputschema["type"] == "array" and
188 isinstance(primary, Sequence)):
190 # array type, find secondary files of elements
193 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
195 elif (inputschema["type"] == "File" and
196 isinstance(primary, Mapping) and
197 primary.get("class") == "File"):
199 if "secondaryFiles" in primary or not secondaryspec:
204 # Found a file, check for secondaryFiles
207 primary["secondaryFiles"] = secondaryspec
208 for i, sf in enumerate(aslist(secondaryspec)):
209 if builder.cwlVersion == "v1.0":
212 pattern = sf["pattern"]
215 if isinstance(pattern, list):
216 specs.extend(pattern)
217 elif isinstance(pattern, dict):
218 specs.append(pattern)
219 elif isinstance(pattern, str):
220 if builder.cwlVersion == "v1.0":
221 specs.append({"pattern": pattern, "required": True})
223 specs.append({"pattern": pattern, "required": sf.get("required")})
225 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226 "Expression must return list, object, string or null")
229 for i, sf in enumerate(specs):
230 if isinstance(sf, dict):
231 if sf.get("class") == "File":
233 if sf.get("location") is None:
234 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235 "File object is missing 'location': %s" % sf)
236 sfpath = sf["location"]
239 pattern = sf["pattern"]
240 required = sf.get("required")
241 elif isinstance(sf, str):
245 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246 "Expression must return list, object, string or null")
248 if pattern is not None:
249 if "${" in pattern or "$(" in pattern:
250 sfname = builder.do_eval(pattern, context=primary)
252 sfname = substitute(primary["basename"], pattern)
257 if isinstance(sfname, str):
258 p_location = primary["location"]
259 if "/" in p_location:
261 p_location[0 : p_location.rindex("/") + 1]
265 required = builder.do_eval(required, context=primary)
267 if isinstance(sfname, list) or isinstance(sfname, dict):
268 each = aslist(sfname)
270 if required and not fsaccess.exists(e.get("location")):
271 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272 "Required secondary file '%s' does not exist" % e.get("location"))
275 if isinstance(sfname, str):
276 if fsaccess.exists(sfpath):
277 if pattern is not None:
278 found.append({"location": sfpath, "class": "File"})
282 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283 "Required secondary file '%s' does not exist" % sfpath)
285 primary["secondaryFiles"] = cmap(found)
286 if discovered is not None:
287 discovered[primary["location"]] = primary["secondaryFiles"]
288 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292 for inputschema in inputs:
293 primary = job_order.get(shortname(inputschema["id"]))
294 if isinstance(primary, (Mapping, Sequence)):
295 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
297 def upload_dependencies(arvrunner, name, document_loader,
298 workflowobj, uri, runtimeContext,
299 include_primary=True, discovered_secondaryfiles=None,
301 """Upload the dependencies of the workflowobj document to Keep.
303 Returns a pathmapper object mapping local paths to keep references. Also
304 does an in-place update of references in "workflowobj".
306 Use scandeps to find $schemas, File and Directory
307 fields that represent external references.
309 If workflowobj has an "id" field, this will reload the document to ensure
310 it is scanning the raw document prior to preprocessing.
313 scanobj = workflowobj
316 with Perf(metrics, "scandeps"):
317 sc_result = scandeps(uri, scanobj,
320 None, urljoin=document_loader.fetcher.urljoin,
322 optional_deps = scandeps(uri, scanobj,
325 None, urljoin=document_loader.fetcher.urljoin,
328 if sc_result is None:
331 if optional_deps is None:
335 sc_result.extend(optional_deps)
340 def collect_uuids(obj):
341 loc = obj.get("location", "")
344 # Collect collection uuids that need to be resolved to
345 # portable data hashes
346 gp = collection_uuid_pattern.match(loc)
348 uuids[gp.groups()[0]] = obj
349 if collectionUUID in obj:
350 uuids[obj[collectionUUID]] = obj
352 def collect_uploads(obj):
353 loc = obj.get("location", "")
357 if sp[0] in ("file", "http", "https"):
358 # Record local files than need to be uploaded,
359 # don't include file literals, keep references, etc.
363 with Perf(metrics, "collect uuids"):
364 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
366 with Perf(metrics, "collect uploads"):
367 visit_class(sc_result, ("File", "Directory"), collect_uploads)
369 # Resolve any collection uuids we found to portable data hashes
370 # and assign them to uuid_map
372 fetch_uuids = list(uuids.keys())
373 with Perf(metrics, "fetch_uuids"):
375 # For a large number of fetch_uuids, API server may limit
376 # response size, so keep fetching from API server has nothing
378 lookups = arvrunner.api.collections().list(
379 filters=[["uuid", "in", fetch_uuids]],
381 select=["uuid", "portable_data_hash"]).execute(
382 num_retries=arvrunner.num_retries)
384 if not lookups["items"]:
387 for l in lookups["items"]:
388 uuid_map[l["uuid"]] = l["portable_data_hash"]
390 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
392 normalizeFilesDirs(sc)
394 if "id" in workflowobj:
395 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
397 # make sure it's included
398 sc.append({"class": "File", "location": defrg})
400 # make sure it's excluded
401 sc = [d for d in sc if d.get("location") != defrg]
403 def visit_default(obj):
404 def defaults_are_optional(f):
405 if "location" not in f and "path" in f:
406 f["location"] = f["path"]
408 normalizeFilesDirs(f)
409 optional_deps.append(f)
410 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
412 find_defaults(workflowobj, visit_default)
415 def discover_default_secondary_files(obj):
416 builder_job_order = {}
417 for t in obj["inputs"]:
418 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
419 # Need to create a builder object to evaluate expressions.
420 builder = make_builder(builder_job_order,
421 obj.get("hints", []),
422 obj.get("requirements", []),
425 discover_secondary_files(arvrunner.fs_access,
431 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
432 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
434 for d in list(discovered):
435 # Only interested in discovered secondaryFiles which are local
436 # files that need to be uploaded.
437 if d.startswith("file:"):
438 sc.extend(discovered[d])
442 with Perf(metrics, "mapper"):
443 mapper = ArvPathMapper(arvrunner, sc, "",
447 single_collection=True,
448 optional_deps=optional_deps)
450 for k, v in uuid_map.items():
451 mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
455 if k.startswith("keep:"):
456 keeprefs.add(collection_pdh_pattern.match(k).group(1))
460 loc = p.get("location")
461 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
462 addkeepref(p["location"])
468 if collectionUUID in p:
469 uuid = p[collectionUUID]
470 if uuid not in uuid_map:
471 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
472 "Collection uuid %s not found" % uuid)
473 gp = collection_pdh_pattern.match(loc)
474 if gp and uuid_map[uuid] != gp.groups()[0]:
475 # This file entry has both collectionUUID and a PDH
476 # location. If the PDH doesn't match the one returned
477 # the API server, raise an error.
478 raise SourceLine(p, "location", validate.ValidationException).makeError(
479 "Expected collection uuid %s to be %s but API server reported %s" % (
480 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
482 gp = collection_uuid_pattern.match(loc)
484 # Not a uuid pattern (must be a pdh pattern)
485 addkeepref(p["location"])
488 uuid = gp.groups()[0]
489 if uuid not in uuid_map:
490 raise SourceLine(p, "location", validate.ValidationException).makeError(
491 "Collection uuid %s not found" % uuid)
493 with Perf(metrics, "collectloc"):
494 visit_class(workflowobj, ("File", "Directory"), collectloc)
495 visit_class(discovered, ("File", "Directory"), collectloc)
497 if discovered_secondaryfiles is not None:
499 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
501 if runtimeContext.copy_deps:
502 # Find referenced collections and copy them into the
503 # destination project, for easy sharing.
504 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
505 filters=[["portable_data_hash", "in", list(keeprefs)],
506 ["owner_uuid", "=", runtimeContext.project_uuid]],
507 select=["uuid", "portable_data_hash", "created_at"]))
509 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
511 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
512 order="created_at desc",
513 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
515 if len(col["items"]) == 0:
516 logger.warning("Cannot find collection with portable data hash %s", kr)
518 col = col["items"][0]
519 col["name"] = arvados.util.trim_name(col["name"])
521 arvrunner.api.collections().create(body={"collection": {
522 "owner_uuid": runtimeContext.project_uuid,
524 "description": col["description"],
525 "properties": col["properties"],
526 "portable_data_hash": col["portable_data_hash"],
527 "manifest_text": col["manifest_text"],
528 "storage_classes_desired": col["storage_classes_desired"],
529 "trash_at": col["trash_at"]
530 }}, ensure_unique_name=True).execute()
531 except Exception as e:
532 logger.warning("Unable to copy collection to destination: %s", e)
534 if "$schemas" in workflowobj:
536 for s in workflowobj["$schemas"]:
538 sch.append(mapper.mapper(s).resolved)
539 workflowobj["$schemas"] = sch
544 def upload_docker(arvrunner, tool, runtimeContext):
545 """Uploads Docker images used in CommandLineTool objects."""
547 if isinstance(tool, CommandLineTool):
548 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
550 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
551 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
552 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
554 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
556 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
557 True, runtimeContext)
558 elif isinstance(tool, cwltool.workflow.Workflow):
560 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
563 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
564 """Create a packed workflow.
566 A "packed" workflow is one where all the components have been combined into a single document."""
569 packed = pack(arvrunner.loadingContext, tool.tool["id"],
570 rewrite_out=rewrites,
571 loader=tool.doc_loader)
573 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
575 def visit(v, cur_id):
576 if isinstance(v, dict):
577 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
578 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
579 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
581 cur_id = rewrite_to_orig.get(v["id"], v["id"])
582 if "path" in v and "location" not in v:
583 v["location"] = v["path"]
585 if "location" in v and cur_id in merged_map:
586 if v["location"] in merged_map[cur_id].resolved:
587 v["location"] = merged_map[cur_id].resolved[v["location"]]
588 if v["location"] in merged_map[cur_id].secondaryFiles:
589 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
590 if v.get("class") == "DockerRequirement":
591 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
595 if isinstance(v, list):
602 packed[g] = git_info[g]
607 def tag_git_version(packed):
608 if tool.tool["id"].startswith("file://"):
609 path = os.path.dirname(tool.tool["id"][7:])
611 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
612 except (OSError, subprocess.CalledProcessError):
615 packed["http://schema.org/version"] = githash
617 def setloc(mapper, p):
618 loc = p.get("location")
619 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
620 p["location"] = mapper.mapper(p["location"]).resolved
626 if collectionUUID in p:
627 uuid = p[collectionUUID]
628 keepuuid = "keep:"+uuid
629 if keepuuid not in mapper:
630 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
631 "Collection uuid %s not found" % uuid)
632 gp = collection_pdh_pattern.match(loc)
633 if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
634 # This file entry has both collectionUUID and a PDH
635 # location. If the PDH doesn't match the one returned
636 # the API server, raise an error.
637 raise SourceLine(p, "location", validate.ValidationException).makeError(
638 "Expected collection uuid %s to be %s but API server reported %s" % (
639 uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
641 gp = collection_uuid_pattern.match(loc)
643 # Not a uuid pattern (must be a pdh pattern)
646 uuid = gp.groups()[0]
647 keepuuid = "keep:"+uuid
648 if keepuuid not in mapper:
649 raise SourceLine(p, "location", validate.ValidationException).makeError(
650 "Collection uuid %s not found" % uuid)
651 p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
652 p[collectionUUID] = uuid
654 def update_from_mapper(workflowobj, mapper):
655 with Perf(metrics, "setloc"):
656 visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
658 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
659 """Upload local files referenced in the input object and return updated input
660 object with 'location' updated to the proper keep references.
663 # Make a copy of the job order and set defaults.
664 builder_job_order = copy.copy(job_order)
666 # fill_in_defaults throws an error if there are any
667 # missing required parameters, we don't want it to do that
668 # so make them all optional.
669 inputs_copy = copy.deepcopy(tool.tool["inputs"])
670 for i in inputs_copy:
671 if "null" not in i["type"]:
672 i["type"] = ["null"] + aslist(i["type"])
674 fill_in_defaults(inputs_copy,
677 # Need to create a builder object to evaluate expressions.
678 builder = make_builder(builder_job_order,
683 # Now update job_order with secondaryFiles
684 discover_secondary_files(arvrunner.fs_access,
689 _jobloaderctx = jobloaderctx.copy()
690 jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
692 jobmapper = upload_dependencies(arvrunner,
696 job_order.get("id", "#"),
699 if "id" in job_order:
702 # Need to filter this out, gets added by cwltool when providing
703 # parameters on the command line.
704 if "job_order" in job_order:
705 del job_order["job_order"]
707 update_from_mapper(job_order, jobmapper)
709 #print(json.dumps(job_order, indent=2))
713 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
715 def upload_workflow_deps(arvrunner, tool, runtimeContext):
716 # Ensure that Docker images needed by this workflow are available
718 with Perf(metrics, "upload_docker"):
719 upload_docker(arvrunner, tool, runtimeContext)
721 document_loader = tool.doc_loader
728 # Standard traversal is top down, we want to go bottom up, so use
729 # the visitor to accumalate a list of nodes to visit, then
730 # visit them in reverse order.
731 def upload_tool_deps(deptool):
735 tool.visit(upload_tool_deps)
737 for deptool in reversed(todo):
738 discovered_secondaryfiles = {}
739 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
740 pm = upload_dependencies(arvrunner,
741 "%s dependencies" % (shortname(deptool["id"])),
746 include_primary=False,
747 discovered_secondaryfiles=discovered_secondaryfiles,
748 cache=tool_dep_cache)
750 document_loader.idx[deptool["id"]] = deptool
752 for k,v in pm.items():
753 toolmap[k] = v.resolved
754 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
758 def arvados_jobs_image(arvrunner, img, runtimeContext):
759 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
762 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
763 True, runtimeContext)
764 except Exception as e:
765 raise Exception("Docker image %s is not available\n%s" % (img, e) )
768 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
769 collection = arvados.collection.Collection(api_client=arvrunner.api,
770 keep_client=arvrunner.keep_client,
771 num_retries=arvrunner.num_retries)
772 with collection.open("workflow.cwl", "w") as f:
773 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
775 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
776 ["name", "like", name+"%"]]
777 if runtimeContext.project_uuid:
778 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
779 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
782 logger.info("Using collection %s", exists["items"][0]["uuid"])
784 collection.save_new(name=name,
785 owner_uuid=runtimeContext.project_uuid,
786 ensure_unique_name=True,
787 num_retries=arvrunner.num_retries)
788 logger.info("Uploaded to %s", collection.manifest_locator())
790 return collection.portable_data_hash()
793 class Runner(Process):
794 """Base class for runner processes, which submit an instance of
795 arvados-cwl-runner and wait for the final result."""
797 def __init__(self, runner, updated_tool,
798 tool, loadingContext, enable_reuse,
799 output_name, output_tags, submit_runner_ram=0,
800 name=None, on_error=None, submit_runner_image=None,
801 intermediate_output_ttl=0, merged_map=None,
802 priority=None, secret_store=None,
803 collection_cache_size=256,
804 collection_cache_is_default=True,
807 self.loadingContext = loadingContext.copy()
808 self.loadingContext.metadata = updated_tool.metadata.copy()
810 super(Runner, self).__init__(updated_tool.tool, loadingContext)
812 self.arvrunner = runner
813 self.embedded_tool = tool
814 self.job_order = None
817 # If reuse is permitted by command line arguments but
818 # disabled by the workflow itself, disable it.
819 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
821 enable_reuse = reuse_req["enableReuse"]
822 reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
824 enable_reuse = reuse_req["enableReuse"]
825 self.enable_reuse = enable_reuse
827 self.final_output = None
828 self.output_name = output_name
829 self.output_tags = output_tags
831 self.on_error = on_error
832 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
833 self.intermediate_output_ttl = intermediate_output_ttl
834 self.priority = priority
835 self.secret_store = secret_store
836 self.enable_dev = self.loadingContext.enable_dev
837 self.git_info = git_info
838 self.fast_parser = self.loadingContext.fast_parser
840 self.submit_runner_cores = 1
841 self.submit_runner_ram = 1024 # defaut 1 GiB
842 self.collection_cache_size = collection_cache_size
844 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
845 if runner_resource_req:
846 if runner_resource_req.get("coresMin"):
847 self.submit_runner_cores = runner_resource_req["coresMin"]
848 if runner_resource_req.get("ramMin"):
849 self.submit_runner_ram = runner_resource_req["ramMin"]
850 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
851 self.collection_cache_size = runner_resource_req["keep_cache"]
853 if submit_runner_ram:
854 # Command line / initializer overrides default and/or spec from workflow
855 self.submit_runner_ram = submit_runner_ram
857 if self.submit_runner_ram <= 0:
858 raise Exception("Value of submit-runner-ram must be greater than zero")
860 if self.submit_runner_cores <= 0:
861 raise Exception("Value of submit-runner-cores must be greater than zero")
863 self.merged_map = merged_map or {}
866 job_order, # type: Mapping[Text, Text]
867 output_callbacks, # type: Callable[[Any, Any], Any]
868 runtimeContext # type: RuntimeContext
869 ): # type: (...) -> Generator[Any, None, None]
870 self.job_order = job_order
871 self._init_job(job_order, runtimeContext)
874 def update_pipeline_component(self, record):
877 def done(self, record):
878 """Base method for handling a completed runner."""
881 if record["state"] == "Complete":
882 if record.get("exit_code") is not None:
883 if record["exit_code"] == 33:
884 processStatus = "UnsupportedRequirement"
885 elif record["exit_code"] == 0:
886 processStatus = "success"
888 processStatus = "permanentFail"
890 processStatus = "success"
892 processStatus = "permanentFail"
896 if processStatus == "permanentFail":
897 logc = arvados.collection.CollectionReader(record["log"],
898 api_client=self.arvrunner.api,
899 keep_client=self.arvrunner.keep_client,
900 num_retries=self.arvrunner.num_retries)
901 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
903 self.final_output = record["output"]
904 outc = arvados.collection.CollectionReader(self.final_output,
905 api_client=self.arvrunner.api,
906 keep_client=self.arvrunner.keep_client,
907 num_retries=self.arvrunner.num_retries)
908 if "cwl.output.json" in outc:
909 with outc.open("cwl.output.json", "rb") as f:
911 outputs = json.loads(f.read().decode())
912 def keepify(fileobj):
913 path = fileobj["location"]
914 if not path.startswith("keep:"):
915 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
916 adjustFileObjs(outputs, keepify)
917 adjustDirObjs(outputs, keepify)
919 logger.exception("[%s] While getting final output object", self.name)
920 self.arvrunner.output_callback({}, "permanentFail")
922 self.arvrunner.output_callback(outputs, processStatus)