1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
9 from functools import partial
13 from collections import namedtuple
14 from io import StringIO
37 from schema_salad.sourceline import SourceLine, cmap
39 from cwltool.command_line_tool import CommandLineTool
40 import cwltool.workflow
41 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
42 shortname, Process, fill_in_defaults)
43 from cwltool.load_tool import fetch_document, jobloaderctx
44 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
45 from cwltool.builder import substitute
46 from cwltool.pack import pack
47 from cwltool.update import INTERNAL_VERSION
48 from cwltool.builder import Builder
49 import schema_salad.validate as validate
50 import schema_salad.ref_resolver
51 from cwltool.secrets import SecretStore
53 import arvados.collection
55 from .util import collectionUUID
56 from ruamel.yaml import YAML
57 from ruamel.yaml.comments import CommentedMap, CommentedSeq
59 import arvados_cwl.arvdocker
60 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
61 from ._version import __version__
63 from . context import ArvRuntimeContext
64 from .perf import Perf
66 basestring = (bytes, str)
67 logger = logging.getLogger('arvados.cwl-runner')
68 metrics = logging.getLogger('arvados.cwl-runner.metrics')
70 def trim_anonymous_location(obj):
71 """Remove 'location' field from File and Directory literals.
73 To make internal handling easier, literals are assigned a random id for
74 'location'. However, when writing the record back out, this can break
75 reproducibility. Since it is valid for literals not have a 'location'
80 if obj.get("location", "").startswith("_:"):
84 def remove_redundant_fields(obj):
85 for field in ("path", "nameext", "nameroot", "dirname"):
90 def find_defaults(d, op):
91 if isinstance(d, list):
94 elif isinstance(d, dict):
101 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
104 files=[], # type: List[Dict[Text, Text]]
105 bindings=[], # type: List[Dict[Text, Any]]
106 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
107 names=None, # type: Names
108 requirements=requirements, # type: List[Dict[Text, Any]]
109 hints=hints, # type: List[Dict[Text, Any]]
110 resources={}, # type: Dict[str, int]
111 mutation_manager=None, # type: Optional[MutationManager]
112 formatgraph=None, # type: Optional[Graph]
113 make_fs_access=None, # type: Type[StdFsAccess]
114 fs_access=None, # type: StdFsAccess
115 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
116 timeout=runtimeContext.eval_timeout, # type: float
117 debug=runtimeContext.debug, # type: bool
118 js_console=runtimeContext.js_console, # type: bool
119 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
120 loadListing="", # type: Text
121 outdir="", # type: Text
122 tmpdir="", # type: Text
123 stagedir="", # type: Text
124 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
125 container_engine="docker"
128 def search_schemadef(name, reqs):
130 if r["class"] == "SchemaDefRequirement":
131 for sd in r["types"]:
132 if sd["name"] == name:
136 primitive_types_set = frozenset(("null", "boolean", "int", "long",
137 "float", "double", "string", "record",
140 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
141 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
142 # union type, collect all possible secondaryFiles
143 for i in inputschema:
144 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
147 if inputschema == "File":
148 inputschema = {"type": "File"}
150 if isinstance(inputschema, basestring):
151 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
157 if "secondaryFiles" in inputschema:
158 # set secondaryFiles, may be inherited by compound types.
159 secondaryspec = inputschema["secondaryFiles"]
161 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
162 not isinstance(inputschema["type"], basestring)):
163 # compound type (union, array, record)
164 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
166 elif (inputschema["type"] == "record" and
167 isinstance(primary, Mapping)):
169 # record type, find secondary files associated with fields.
171 for f in inputschema["fields"]:
172 p = primary.get(shortname(f["name"]))
174 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
176 elif (inputschema["type"] == "array" and
177 isinstance(primary, Sequence)):
179 # array type, find secondary files of elements
182 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
184 elif (inputschema["type"] == "File" and
185 isinstance(primary, Mapping) and
186 primary.get("class") == "File"):
188 if "secondaryFiles" in primary or not secondaryspec:
193 # Found a file, check for secondaryFiles
196 primary["secondaryFiles"] = secondaryspec
197 for i, sf in enumerate(aslist(secondaryspec)):
198 if builder.cwlVersion == "v1.0":
201 pattern = sf["pattern"]
204 if isinstance(pattern, list):
205 specs.extend(pattern)
206 elif isinstance(pattern, dict):
207 specs.append(pattern)
208 elif isinstance(pattern, str):
209 if builder.cwlVersion == "v1.0":
210 specs.append({"pattern": pattern, "required": True})
212 specs.append({"pattern": pattern, "required": sf.get("required")})
214 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
215 "Expression must return list, object, string or null")
218 for i, sf in enumerate(specs):
219 if isinstance(sf, dict):
220 if sf.get("class") == "File":
222 if sf.get("location") is None:
223 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
224 "File object is missing 'location': %s" % sf)
225 sfpath = sf["location"]
228 pattern = sf["pattern"]
229 required = sf.get("required")
230 elif isinstance(sf, str):
234 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235 "Expression must return list, object, string or null")
237 if pattern is not None:
238 if "${" in pattern or "$(" in pattern:
239 sfname = builder.do_eval(pattern, context=primary)
241 sfname = substitute(primary["basename"], pattern)
246 if isinstance(sfname, str):
247 p_location = primary["location"]
248 if "/" in p_location:
250 p_location[0 : p_location.rindex("/") + 1]
254 required = builder.do_eval(required, context=primary)
256 if isinstance(sfname, list) or isinstance(sfname, dict):
257 each = aslist(sfname)
259 if required and not fsaccess.exists(e.get("location")):
260 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
261 "Required secondary file '%s' does not exist" % e.get("location"))
264 if isinstance(sfname, str):
265 if fsaccess.exists(sfpath):
266 if pattern is not None:
267 found.append({"location": sfpath, "class": "File"})
271 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272 "Required secondary file '%s' does not exist" % sfpath)
274 primary["secondaryFiles"] = cmap(found)
275 if discovered is not None:
276 discovered[primary["location"]] = primary["secondaryFiles"]
277 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
278 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
280 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
281 for inputschema in inputs:
282 primary = job_order.get(shortname(inputschema["id"]))
283 if isinstance(primary, (Mapping, Sequence)):
284 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
286 def upload_dependencies(arvrunner, name, document_loader,
287 workflowobj, uri, runtimeContext,
288 include_primary=True, discovered_secondaryfiles=None,
290 """Upload the dependencies of the workflowobj document to Keep.
292 Returns a pathmapper object mapping local paths to keep references. Also
293 does an in-place update of references in "workflowobj".
295 Use scandeps to find $schemas, File and Directory
296 fields that represent external references.
298 If workflowobj has an "id" field, this will reload the document to ensure
299 it is scanning the raw document prior to preprocessing.
302 scanobj = workflowobj
305 with Perf(metrics, "scandeps"):
306 sc_result = scandeps(uri, scanobj,
309 None, urljoin=document_loader.fetcher.urljoin,
311 optional_deps = scandeps(uri, scanobj,
314 None, urljoin=document_loader.fetcher.urljoin,
317 if sc_result is None:
320 if optional_deps is None:
324 sc_result.extend(optional_deps)
329 def collect_uuids(obj):
330 loc = obj.get("location", "")
333 # Collect collection uuids that need to be resolved to
334 # portable data hashes
335 gp = collection_uuid_pattern.match(loc)
337 uuids[gp.groups()[0]] = obj
338 if collectionUUID in obj:
339 uuids[obj[collectionUUID]] = obj
341 def collect_uploads(obj):
342 loc = obj.get("location", "")
346 if sp[0] in ("file", "http", "https"):
347 # Record local files than need to be uploaded,
348 # don't include file literals, keep references, etc.
352 with Perf(metrics, "collect uuids"):
353 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
355 with Perf(metrics, "collect uploads"):
356 visit_class(sc_result, ("File", "Directory"), collect_uploads)
358 # Resolve any collection uuids we found to portable data hashes
359 # and assign them to uuid_map
361 fetch_uuids = list(uuids.keys())
362 with Perf(metrics, "fetch_uuids"):
364 # For a large number of fetch_uuids, API server may limit
365 # response size, so keep fetching from API server has nothing
367 lookups = arvrunner.api.collections().list(
368 filters=[["uuid", "in", fetch_uuids]],
370 select=["uuid", "portable_data_hash"]).execute(
371 num_retries=arvrunner.num_retries)
373 if not lookups["items"]:
376 for l in lookups["items"]:
377 uuid_map[l["uuid"]] = l["portable_data_hash"]
379 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
381 normalizeFilesDirs(sc)
383 if "id" in workflowobj:
384 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
386 # make sure it's included
387 sc.append({"class": "File", "location": defrg})
389 # make sure it's excluded
390 sc = [d for d in sc if d.get("location") != defrg]
392 def visit_default(obj):
393 def defaults_are_optional(f):
394 if "location" not in f and "path" in f:
395 f["location"] = f["path"]
397 normalizeFilesDirs(f)
398 optional_deps.append(f)
399 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
401 find_defaults(workflowobj, visit_default)
404 def discover_default_secondary_files(obj):
405 builder_job_order = {}
406 for t in obj["inputs"]:
407 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
408 # Need to create a builder object to evaluate expressions.
409 builder = make_builder(builder_job_order,
410 obj.get("hints", []),
411 obj.get("requirements", []),
414 discover_secondary_files(arvrunner.fs_access,
420 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
421 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
423 for d in list(discovered):
424 # Only interested in discovered secondaryFiles which are local
425 # files that need to be uploaded.
426 if d.startswith("file:"):
427 sc.extend(discovered[d])
431 with Perf(metrics, "mapper"):
432 mapper = ArvPathMapper(arvrunner, sc, "",
436 single_collection=True,
437 optional_deps=optional_deps)
439 for k, v in uuid_map.items():
440 mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
444 if k.startswith("keep:"):
445 keeprefs.add(collection_pdh_pattern.match(k).group(1))
449 loc = p.get("location")
450 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
451 addkeepref(p["location"])
457 if collectionUUID in p:
458 uuid = p[collectionUUID]
459 if uuid not in uuid_map:
460 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
461 "Collection uuid %s not found" % uuid)
462 gp = collection_pdh_pattern.match(loc)
463 if gp and uuid_map[uuid] != gp.groups()[0]:
464 # This file entry has both collectionUUID and a PDH
465 # location. If the PDH doesn't match the one returned
466 # the API server, raise an error.
467 raise SourceLine(p, "location", validate.ValidationException).makeError(
468 "Expected collection uuid %s to be %s but API server reported %s" % (
469 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
471 gp = collection_uuid_pattern.match(loc)
473 # Not a uuid pattern (must be a pdh pattern)
474 addkeepref(p["location"])
477 uuid = gp.groups()[0]
478 if uuid not in uuid_map:
479 raise SourceLine(p, "location", validate.ValidationException).makeError(
480 "Collection uuid %s not found" % uuid)
482 with Perf(metrics, "collectloc"):
483 visit_class(workflowobj, ("File", "Directory"), collectloc)
484 visit_class(discovered, ("File", "Directory"), collectloc)
486 if discovered_secondaryfiles is not None:
488 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
490 if runtimeContext.copy_deps:
491 # Find referenced collections and copy them into the
492 # destination project, for easy sharing.
493 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
494 filters=[["portable_data_hash", "in", list(keeprefs)],
495 ["owner_uuid", "=", runtimeContext.project_uuid]],
496 select=["uuid", "portable_data_hash", "created_at"]))
498 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
500 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
501 order="created_at desc",
502 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
504 if len(col["items"]) == 0:
505 logger.warning("Cannot find collection with portable data hash %s", kr)
507 col = col["items"][0]
508 col["name"] = arvados.util.trim_name(col["name"])
510 arvrunner.api.collections().create(body={"collection": {
511 "owner_uuid": runtimeContext.project_uuid,
513 "description": col["description"],
514 "properties": col["properties"],
515 "portable_data_hash": col["portable_data_hash"],
516 "manifest_text": col["manifest_text"],
517 "storage_classes_desired": col["storage_classes_desired"],
518 "trash_at": col["trash_at"]
519 }}, ensure_unique_name=True).execute()
520 except Exception as e:
521 logger.warning("Unable to copy collection to destination: %s", e)
523 if "$schemas" in workflowobj:
525 for s in workflowobj["$schemas"]:
527 sch.append(mapper.mapper(s).resolved)
528 workflowobj["$schemas"] = sch
533 def upload_docker(arvrunner, tool, runtimeContext):
534 """Uploads Docker images used in CommandLineTool objects."""
536 if isinstance(tool, CommandLineTool):
537 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
539 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
540 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
541 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
543 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
545 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
546 True, runtimeContext)
547 elif isinstance(tool, cwltool.workflow.Workflow):
549 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
552 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
553 """Create a packed workflow.
555 A "packed" workflow is one where all the components have been combined into a single document."""
558 packed = pack(arvrunner.loadingContext, tool.tool["id"],
559 rewrite_out=rewrites,
560 loader=tool.doc_loader)
562 rewrite_to_orig = {v: k for k,v in rewrites.items()}
564 def visit(v, cur_id):
565 if isinstance(v, dict):
566 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
567 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
568 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
570 cur_id = rewrite_to_orig.get(v["id"], v["id"])
571 if "path" in v and "location" not in v:
572 v["location"] = v["path"]
574 if "location" in v and cur_id in merged_map:
575 if v["location"] in merged_map[cur_id].resolved:
576 v["location"] = merged_map[cur_id].resolved[v["location"]]
577 if v["location"] in merged_map[cur_id].secondaryFiles:
578 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
579 if v.get("class") == "DockerRequirement":
580 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
584 if isinstance(v, list):
591 packed[g] = git_info[g]
596 def tag_git_version(packed):
597 if tool.tool["id"].startswith("file://"):
598 path = os.path.dirname(tool.tool["id"][7:])
600 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
601 except (OSError, subprocess.CalledProcessError):
604 packed["http://schema.org/version"] = githash
606 def setloc(mapper, p):
607 loc = p.get("location")
608 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
609 p["location"] = mapper.mapper(p["location"]).resolved
615 if collectionUUID in p:
616 uuid = p[collectionUUID]
617 keepuuid = "keep:"+uuid
618 if keepuuid not in mapper:
619 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
620 "Collection uuid %s not found" % uuid)
621 gp = collection_pdh_pattern.match(loc)
622 if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
623 # This file entry has both collectionUUID and a PDH
624 # location. If the PDH doesn't match the one returned
625 # the API server, raise an error.
626 raise SourceLine(p, "location", validate.ValidationException).makeError(
627 "Expected collection uuid %s to be %s but API server reported %s" % (
628 uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
630 gp = collection_uuid_pattern.match(loc)
632 # Not a uuid pattern (must be a pdh pattern)
635 uuid = gp.groups()[0]
636 keepuuid = "keep:"+uuid
637 if keepuuid not in mapper:
638 raise SourceLine(p, "location", validate.ValidationException).makeError(
639 "Collection uuid %s not found" % uuid)
640 p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
641 p[collectionUUID] = uuid
643 def update_from_mapper(workflowobj, mapper):
644 with Perf(metrics, "setloc"):
645 visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
647 def apply_merged_map(merged_map, workflowobj):
648 def visit(v, cur_id):
649 if isinstance(v, dict):
650 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
653 if "path" in v and "location" not in v:
654 v["location"] = v["path"]
656 if "location" in v and cur_id in merged_map:
657 if v["location"] in merged_map[cur_id].resolved:
658 v["location"] = merged_map[cur_id].resolved[v["location"]]
659 if v["location"] in merged_map[cur_id].secondaryFiles:
660 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
661 #if v.get("class") == "DockerRequirement":
662 # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
666 if isinstance(v, list):
669 visit(workflowobj, None)
671 def update_from_merged_map(tool, merged_map):
672 tool.visit(partial(apply_merged_map, merged_map))
674 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
675 """Upload local files referenced in the input object and return updated input
676 object with 'location' updated to the proper keep references.
679 # Make a copy of the job order and set defaults.
680 builder_job_order = copy.copy(job_order)
682 # fill_in_defaults throws an error if there are any
683 # missing required parameters, we don't want it to do that
684 # so make them all optional.
685 inputs_copy = copy.deepcopy(tool.tool["inputs"])
686 for i in inputs_copy:
687 if "null" not in i["type"]:
688 i["type"] = ["null"] + aslist(i["type"])
690 fill_in_defaults(inputs_copy,
693 # Need to create a builder object to evaluate expressions.
694 builder = make_builder(builder_job_order,
699 # Now update job_order with secondaryFiles
700 discover_secondary_files(arvrunner.fs_access,
705 _jobloaderctx = jobloaderctx.copy()
706 jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
708 jobmapper = upload_dependencies(arvrunner,
712 job_order.get("id", "#"),
715 if "id" in job_order:
718 # Need to filter this out, gets added by cwltool when providing
719 # parameters on the command line.
720 if "job_order" in job_order:
721 del job_order["job_order"]
723 update_from_mapper(job_order, jobmapper)
725 return job_order, jobmapper
727 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
729 def upload_workflow_deps(arvrunner, tool, runtimeContext):
730 # Ensure that Docker images needed by this workflow are available
732 with Perf(metrics, "upload_docker"):
733 upload_docker(arvrunner, tool, runtimeContext)
735 document_loader = tool.doc_loader
742 # Standard traversal is top down, we want to go bottom up, so use
743 # the visitor to accumalate a list of nodes to visit, then
744 # visit them in reverse order.
745 def upload_tool_deps(deptool):
749 tool.visit(upload_tool_deps)
751 for deptool in reversed(todo):
752 discovered_secondaryfiles = {}
753 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
754 pm = upload_dependencies(arvrunner,
755 "%s dependencies" % (shortname(deptool["id"])),
760 include_primary=False,
761 discovered_secondaryfiles=discovered_secondaryfiles,
762 cache=tool_dep_cache)
764 document_loader.idx[deptool["id"]] = deptool
766 for k,v in pm.items():
767 toolmap[k] = v.resolved
769 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
773 def arvados_jobs_image(arvrunner, img, runtimeContext):
774 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
777 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
778 True, runtimeContext)
779 except Exception as e:
780 raise Exception("Docker image %s is not available\n%s" % (img, e) )
783 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
784 collection = arvados.collection.Collection(api_client=arvrunner.api,
785 keep_client=arvrunner.keep_client,
786 num_retries=arvrunner.num_retries)
787 with collection.open("workflow.cwl", "w") as f:
788 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
790 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
791 ["name", "like", name+"%"]]
792 if runtimeContext.project_uuid:
793 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
794 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
797 logger.info("Using collection %s", exists["items"][0]["uuid"])
799 collection.save_new(name=name,
800 owner_uuid=runtimeContext.project_uuid,
801 ensure_unique_name=True,
802 num_retries=arvrunner.num_retries)
803 logger.info("Uploaded to %s", collection.manifest_locator())
805 return collection.portable_data_hash()
808 class Runner(Process):
809 """Base class for runner processes, which submit an instance of
810 arvados-cwl-runner and wait for the final result."""
812 def __init__(self, runner,
813 tool, loadingContext, enable_reuse,
814 output_name, output_tags, submit_runner_ram=0,
815 name=None, on_error=None, submit_runner_image=None,
816 intermediate_output_ttl=0, merged_map=None,
817 priority=None, secret_store=None,
818 collection_cache_size=256,
819 collection_cache_is_default=True,
823 self.loadingContext = loadingContext.copy()
825 super(Runner, self).__init__(tool.tool, loadingContext)
827 self.arvrunner = runner
828 self.embedded_tool = tool
829 self.job_order = None
832 # If reuse is permitted by command line arguments but
833 # disabled by the workflow itself, disable it.
834 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
836 enable_reuse = reuse_req["enableReuse"]
837 reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
839 enable_reuse = reuse_req["enableReuse"]
840 self.enable_reuse = enable_reuse
842 self.final_output = None
843 self.output_name = output_name
844 self.output_tags = output_tags
846 self.on_error = on_error
847 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
848 self.intermediate_output_ttl = intermediate_output_ttl
849 self.priority = priority
850 self.secret_store = secret_store
851 self.enable_dev = self.loadingContext.enable_dev
852 self.git_info = git_info
853 self.fast_parser = self.loadingContext.fast_parser
854 self.reuse_runner = reuse_runner
856 self.submit_runner_cores = 1
857 self.submit_runner_ram = 1024 # defaut 1 GiB
858 self.collection_cache_size = collection_cache_size
860 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
861 if runner_resource_req:
862 if runner_resource_req.get("coresMin"):
863 self.submit_runner_cores = runner_resource_req["coresMin"]
864 if runner_resource_req.get("ramMin"):
865 self.submit_runner_ram = runner_resource_req["ramMin"]
866 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
867 self.collection_cache_size = runner_resource_req["keep_cache"]
869 if submit_runner_ram:
870 # Command line / initializer overrides default and/or spec from workflow
871 self.submit_runner_ram = submit_runner_ram
873 if self.submit_runner_ram <= 0:
874 raise Exception("Value of submit-runner-ram must be greater than zero")
876 if self.submit_runner_cores <= 0:
877 raise Exception("Value of submit-runner-cores must be greater than zero")
879 self.merged_map = merged_map or {}
882 job_order, # type: Mapping[Text, Text]
883 output_callbacks, # type: Callable[[Any, Any], Any]
884 runtimeContext # type: RuntimeContext
885 ): # type: (...) -> Generator[Any, None, None]
886 self.job_order = job_order
887 self._init_job(job_order, runtimeContext)
890 def update_pipeline_component(self, record):
893 def done(self, record):
894 """Base method for handling a completed runner."""
897 if record["state"] == "Complete":
898 if record.get("exit_code") is not None:
899 if record["exit_code"] == 33:
900 processStatus = "UnsupportedRequirement"
901 elif record["exit_code"] == 0:
902 processStatus = "success"
904 processStatus = "permanentFail"
906 processStatus = "success"
908 processStatus = "permanentFail"
912 if processStatus == "permanentFail":
913 logc = arvados.collection.CollectionReader(record["log"],
914 api_client=self.arvrunner.api,
915 keep_client=self.arvrunner.keep_client,
916 num_retries=self.arvrunner.num_retries)
917 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
918 include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
920 self.final_output = record["output"]
921 outc = arvados.collection.CollectionReader(self.final_output,
922 api_client=self.arvrunner.api,
923 keep_client=self.arvrunner.keep_client,
924 num_retries=self.arvrunner.num_retries)
925 if "cwl.output.json" in outc:
926 with outc.open("cwl.output.json", "rb") as f:
928 outputs = json.loads(str(f.read(), 'utf-8'))
929 def keepify(fileobj):
930 path = fileobj["location"]
931 if not path.startswith("keep:"):
932 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
933 adjustFileObjs(outputs, keepify)
934 adjustDirObjs(outputs, keepify)
936 logger.exception("[%s] While getting final output object", self.name)
937 self.arvrunner.output_callback({}, "permanentFail")
939 self.arvrunner.output_callback(outputs, processStatus)
942 def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool):
943 def collect_locators(obj):
944 loc = obj.get("location", "")
946 g = arvados.util.keepuri_pattern.match(loc)
950 if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj:
951 references.add(obj["acrContainerImage"])
953 if obj.get("class") == "DockerRequirement":
954 references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext))
956 sc_result = scandeps(tool["id"], tool,
958 set(("location", "id")),
959 None, urljoin=doc_loader.fetcher.urljoin,
962 visit_class(sc_result, ("File", "Directory"), collect_locators)
963 visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
966 def print_keep_deps(arvRunner, runtimeContext, merged_map, tool):
969 tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader))
971 for mm in merged_map:
972 for k, v in merged_map[mm].resolved.items():
973 g = arvados.util.keepuri_pattern.match(v)
977 json.dump(sorted(references), arvRunner.stdout)
978 print(file=arvRunner.stdout)
980 class ArvSecretStore(SecretStore):
981 def add(self, value):
984 return super().add(value)