1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
9 from functools import partial
13 from collections import namedtuple
14 from io import StringIO
34 from cwltool.utils import (
42 from schema_salad.sourceline import SourceLine, cmap
44 from cwltool.command_line_tool import CommandLineTool
45 import cwltool.workflow
46 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
47 shortname, Process, fill_in_defaults)
48 from cwltool.load_tool import fetch_document, jobloaderctx
49 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
50 from cwltool.builder import substitute
51 from cwltool.pack import pack
52 from cwltool.update import INTERNAL_VERSION
53 from cwltool.builder import Builder
54 import schema_salad.validate as validate
55 import schema_salad.ref_resolver
57 import arvados.collection
59 from .util import collectionUUID
60 from ruamel.yaml import YAML
61 from ruamel.yaml.comments import CommentedMap, CommentedSeq
63 import arvados_cwl.arvdocker
64 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
65 from ._version import __version__
67 from . context import ArvRuntimeContext
68 from .perf import Perf
70 basestring = (bytes, str)
71 logger = logging.getLogger('arvados.cwl-runner')
72 metrics = logging.getLogger('arvados.cwl-runner.metrics')
74 def trim_anonymous_location(obj):
75 """Remove 'location' field from File and Directory literals.
77 To make internal handling easier, literals are assigned a random id for
78 'location'. However, when writing the record back out, this can break
79 reproducibility. Since it is valid for literals not have a 'location'
84 if obj.get("location", "").startswith("_:"):
88 def remove_redundant_fields(obj):
89 for field in ("path", "nameext", "nameroot", "dirname"):
94 def find_defaults(d, op):
95 if isinstance(d, list):
98 elif isinstance(d, dict):
105 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
108 files=[], # type: List[Dict[Text, Text]]
109 bindings=[], # type: List[Dict[Text, Any]]
110 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
111 names=None, # type: Names
112 requirements=requirements, # type: List[Dict[Text, Any]]
113 hints=hints, # type: List[Dict[Text, Any]]
114 resources={}, # type: Dict[str, int]
115 mutation_manager=None, # type: Optional[MutationManager]
116 formatgraph=None, # type: Optional[Graph]
117 make_fs_access=None, # type: Type[StdFsAccess]
118 fs_access=None, # type: StdFsAccess
119 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
120 timeout=runtimeContext.eval_timeout, # type: float
121 debug=runtimeContext.debug, # type: bool
122 js_console=runtimeContext.js_console, # type: bool
123 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
124 loadListing="", # type: Text
125 outdir="", # type: Text
126 tmpdir="", # type: Text
127 stagedir="", # type: Text
128 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
129 container_engine="docker"
132 def search_schemadef(name, reqs):
134 if r["class"] == "SchemaDefRequirement":
135 for sd in r["types"]:
136 if sd["name"] == name:
140 primitive_types_set = frozenset(("null", "boolean", "int", "long",
141 "float", "double", "string", "record",
144 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
145 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
146 # union type, collect all possible secondaryFiles
147 for i in inputschema:
148 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
151 if inputschema == "File":
152 inputschema = {"type": "File"}
154 if isinstance(inputschema, basestring):
155 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
161 if "secondaryFiles" in inputschema:
162 # set secondaryFiles, may be inherited by compound types.
163 secondaryspec = inputschema["secondaryFiles"]
165 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
166 not isinstance(inputschema["type"], basestring)):
167 # compound type (union, array, record)
168 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
170 elif (inputschema["type"] == "record" and
171 isinstance(primary, Mapping)):
173 # record type, find secondary files associated with fields.
175 for f in inputschema["fields"]:
176 p = primary.get(shortname(f["name"]))
178 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
180 elif (inputschema["type"] == "array" and
181 isinstance(primary, Sequence)):
183 # array type, find secondary files of elements
186 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
188 elif (inputschema["type"] == "File" and
189 isinstance(primary, Mapping) and
190 primary.get("class") == "File"):
192 if "secondaryFiles" in primary or not secondaryspec:
197 # Found a file, check for secondaryFiles
200 primary["secondaryFiles"] = secondaryspec
201 for i, sf in enumerate(aslist(secondaryspec)):
202 if builder.cwlVersion == "v1.0":
205 pattern = sf["pattern"]
208 if isinstance(pattern, list):
209 specs.extend(pattern)
210 elif isinstance(pattern, dict):
211 specs.append(pattern)
212 elif isinstance(pattern, str):
213 if builder.cwlVersion == "v1.0":
214 specs.append({"pattern": pattern, "required": True})
216 specs.append({"pattern": pattern, "required": sf.get("required")})
218 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
219 "Expression must return list, object, string or null")
222 for i, sf in enumerate(specs):
223 if isinstance(sf, dict):
224 if sf.get("class") == "File":
226 if sf.get("location") is None:
227 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
228 "File object is missing 'location': %s" % sf)
229 sfpath = sf["location"]
232 pattern = sf["pattern"]
233 required = sf.get("required")
234 elif isinstance(sf, str):
238 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
239 "Expression must return list, object, string or null")
241 if pattern is not None:
242 if "${" in pattern or "$(" in pattern:
243 sfname = builder.do_eval(pattern, context=primary)
245 sfname = substitute(primary["basename"], pattern)
250 if isinstance(sfname, str):
251 p_location = primary["location"]
252 if "/" in p_location:
254 p_location[0 : p_location.rindex("/") + 1]
258 required = builder.do_eval(required, context=primary)
260 if isinstance(sfname, list) or isinstance(sfname, dict):
261 each = aslist(sfname)
263 if required and not fsaccess.exists(e.get("location")):
264 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
265 "Required secondary file '%s' does not exist" % e.get("location"))
268 if isinstance(sfname, str):
269 if fsaccess.exists(sfpath):
270 if pattern is not None:
271 found.append({"location": sfpath, "class": "File"})
275 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
276 "Required secondary file '%s' does not exist" % sfpath)
278 primary["secondaryFiles"] = cmap(found)
279 if discovered is not None:
280 discovered[primary["location"]] = primary["secondaryFiles"]
281 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
282 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
284 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
285 for inputschema in inputs:
286 primary = job_order.get(shortname(inputschema["id"]))
287 if isinstance(primary, (Mapping, Sequence)):
288 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
290 def upload_dependencies(arvrunner, name, document_loader,
291 workflowobj, uri, runtimeContext,
292 include_primary=True, discovered_secondaryfiles=None,
294 """Upload the dependencies of the workflowobj document to Keep.
296 Returns a pathmapper object mapping local paths to keep references. Also
297 does an in-place update of references in "workflowobj".
299 Use scandeps to find $schemas, File and Directory
300 fields that represent external references.
302 If workflowobj has an "id" field, this will reload the document to ensure
303 it is scanning the raw document prior to preprocessing.
306 scanobj = workflowobj
309 with Perf(metrics, "scandeps"):
310 sc_result = scandeps(uri, scanobj,
313 None, urljoin=document_loader.fetcher.urljoin,
315 optional_deps = scandeps(uri, scanobj,
318 None, urljoin=document_loader.fetcher.urljoin,
321 if sc_result is None:
324 if optional_deps is None:
328 sc_result.extend(optional_deps)
333 def collect_uuids(obj):
334 loc = obj.get("location", "")
337 # Collect collection uuids that need to be resolved to
338 # portable data hashes
339 gp = collection_uuid_pattern.match(loc)
341 uuids[gp.groups()[0]] = obj
342 if collectionUUID in obj:
343 uuids[obj[collectionUUID]] = obj
345 def collect_uploads(obj):
346 loc = obj.get("location", "")
350 if sp[0] in ("file", "http", "https"):
351 # Record local files than need to be uploaded,
352 # don't include file literals, keep references, etc.
356 with Perf(metrics, "collect uuids"):
357 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
359 with Perf(metrics, "collect uploads"):
360 visit_class(sc_result, ("File", "Directory"), collect_uploads)
362 # Resolve any collection uuids we found to portable data hashes
363 # and assign them to uuid_map
365 fetch_uuids = list(uuids.keys())
366 with Perf(metrics, "fetch_uuids"):
368 # For a large number of fetch_uuids, API server may limit
369 # response size, so keep fetching from API server has nothing
371 lookups = arvrunner.api.collections().list(
372 filters=[["uuid", "in", fetch_uuids]],
374 select=["uuid", "portable_data_hash"]).execute(
375 num_retries=arvrunner.num_retries)
377 if not lookups["items"]:
380 for l in lookups["items"]:
381 uuid_map[l["uuid"]] = l["portable_data_hash"]
383 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
385 normalizeFilesDirs(sc)
387 if "id" in workflowobj:
388 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
390 # make sure it's included
391 sc.append({"class": "File", "location": defrg})
393 # make sure it's excluded
394 sc = [d for d in sc if d.get("location") != defrg]
396 def visit_default(obj):
397 def defaults_are_optional(f):
398 if "location" not in f and "path" in f:
399 f["location"] = f["path"]
401 normalizeFilesDirs(f)
402 optional_deps.append(f)
403 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
405 find_defaults(workflowobj, visit_default)
408 def discover_default_secondary_files(obj):
409 builder_job_order = {}
410 for t in obj["inputs"]:
411 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
412 # Need to create a builder object to evaluate expressions.
413 builder = make_builder(builder_job_order,
414 obj.get("hints", []),
415 obj.get("requirements", []),
418 discover_secondary_files(arvrunner.fs_access,
424 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
425 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
427 for d in list(discovered):
428 # Only interested in discovered secondaryFiles which are local
429 # files that need to be uploaded.
430 if d.startswith("file:"):
431 sc.extend(discovered[d])
435 with Perf(metrics, "mapper"):
436 mapper = ArvPathMapper(arvrunner, sc, "",
440 single_collection=True,
441 optional_deps=optional_deps)
443 for k, v in uuid_map.items():
444 mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
448 if k.startswith("keep:"):
449 keeprefs.add(collection_pdh_pattern.match(k).group(1))
453 loc = p.get("location")
454 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
455 addkeepref(p["location"])
461 if collectionUUID in p:
462 uuid = p[collectionUUID]
463 if uuid not in uuid_map:
464 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
465 "Collection uuid %s not found" % uuid)
466 gp = collection_pdh_pattern.match(loc)
467 if gp and uuid_map[uuid] != gp.groups()[0]:
468 # This file entry has both collectionUUID and a PDH
469 # location. If the PDH doesn't match the one returned
470 # the API server, raise an error.
471 raise SourceLine(p, "location", validate.ValidationException).makeError(
472 "Expected collection uuid %s to be %s but API server reported %s" % (
473 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
475 gp = collection_uuid_pattern.match(loc)
477 # Not a uuid pattern (must be a pdh pattern)
478 addkeepref(p["location"])
481 uuid = gp.groups()[0]
482 if uuid not in uuid_map:
483 raise SourceLine(p, "location", validate.ValidationException).makeError(
484 "Collection uuid %s not found" % uuid)
486 with Perf(metrics, "collectloc"):
487 visit_class(workflowobj, ("File", "Directory"), collectloc)
488 visit_class(discovered, ("File", "Directory"), collectloc)
490 if discovered_secondaryfiles is not None:
492 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
494 if runtimeContext.copy_deps:
495 # Find referenced collections and copy them into the
496 # destination project, for easy sharing.
497 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
498 filters=[["portable_data_hash", "in", list(keeprefs)],
499 ["owner_uuid", "=", runtimeContext.project_uuid]],
500 select=["uuid", "portable_data_hash", "created_at"]))
502 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
504 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
505 order="created_at desc",
506 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
508 if len(col["items"]) == 0:
509 logger.warning("Cannot find collection with portable data hash %s", kr)
511 col = col["items"][0]
512 col["name"] = arvados.util.trim_name(col["name"])
514 arvrunner.api.collections().create(body={"collection": {
515 "owner_uuid": runtimeContext.project_uuid,
517 "description": col["description"],
518 "properties": col["properties"],
519 "portable_data_hash": col["portable_data_hash"],
520 "manifest_text": col["manifest_text"],
521 "storage_classes_desired": col["storage_classes_desired"],
522 "trash_at": col["trash_at"]
523 }}, ensure_unique_name=True).execute()
524 except Exception as e:
525 logger.warning("Unable to copy collection to destination: %s", e)
527 if "$schemas" in workflowobj:
529 for s in workflowobj["$schemas"]:
531 sch.append(mapper.mapper(s).resolved)
532 workflowobj["$schemas"] = sch
537 def upload_docker(arvrunner, tool, runtimeContext):
538 """Uploads Docker images used in CommandLineTool objects."""
540 if isinstance(tool, CommandLineTool):
541 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
543 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
544 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
545 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
547 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
549 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
550 True, runtimeContext)
551 elif isinstance(tool, cwltool.workflow.Workflow):
553 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
556 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
557 """Create a packed workflow.
559 A "packed" workflow is one where all the components have been combined into a single document."""
562 packed = pack(arvrunner.loadingContext, tool.tool["id"],
563 rewrite_out=rewrites,
564 loader=tool.doc_loader)
566 rewrite_to_orig = {v: k for k,v in rewrites.items()}
568 def visit(v, cur_id):
569 if isinstance(v, dict):
570 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
571 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
572 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
574 cur_id = rewrite_to_orig.get(v["id"], v["id"])
575 if "path" in v and "location" not in v:
576 v["location"] = v["path"]
578 if "location" in v and cur_id in merged_map:
579 if v["location"] in merged_map[cur_id].resolved:
580 v["location"] = merged_map[cur_id].resolved[v["location"]]
581 if v["location"] in merged_map[cur_id].secondaryFiles:
582 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
583 if v.get("class") == "DockerRequirement":
584 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
588 if isinstance(v, list):
595 packed[g] = git_info[g]
600 def tag_git_version(packed):
601 if tool.tool["id"].startswith("file://"):
602 path = os.path.dirname(tool.tool["id"][7:])
604 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
605 except (OSError, subprocess.CalledProcessError):
608 packed["http://schema.org/version"] = githash
610 def setloc(mapper, p):
611 loc = p.get("location")
612 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
613 p["location"] = mapper.mapper(p["location"]).resolved
619 if collectionUUID in p:
620 uuid = p[collectionUUID]
621 keepuuid = "keep:"+uuid
622 if keepuuid not in mapper:
623 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
624 "Collection uuid %s not found" % uuid)
625 gp = collection_pdh_pattern.match(loc)
626 if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
627 # This file entry has both collectionUUID and a PDH
628 # location. If the PDH doesn't match the one returned
629 # the API server, raise an error.
630 raise SourceLine(p, "location", validate.ValidationException).makeError(
631 "Expected collection uuid %s to be %s but API server reported %s" % (
632 uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
634 gp = collection_uuid_pattern.match(loc)
636 # Not a uuid pattern (must be a pdh pattern)
639 uuid = gp.groups()[0]
640 keepuuid = "keep:"+uuid
641 if keepuuid not in mapper:
642 raise SourceLine(p, "location", validate.ValidationException).makeError(
643 "Collection uuid %s not found" % uuid)
644 p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
645 p[collectionUUID] = uuid
647 def update_from_mapper(workflowobj, mapper):
648 with Perf(metrics, "setloc"):
649 visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
651 def apply_merged_map(merged_map, workflowobj):
652 def visit(v, cur_id):
653 if isinstance(v, dict):
654 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
657 if "path" in v and "location" not in v:
658 v["location"] = v["path"]
660 if "location" in v and cur_id in merged_map:
661 if v["location"] in merged_map[cur_id].resolved:
662 v["location"] = merged_map[cur_id].resolved[v["location"]]
663 if v["location"] in merged_map[cur_id].secondaryFiles:
664 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
665 #if v.get("class") == "DockerRequirement":
666 # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
670 if isinstance(v, list):
673 visit(workflowobj, None)
675 def update_from_merged_map(tool, merged_map):
676 tool.visit(partial(apply_merged_map, merged_map))
678 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
679 """Upload local files referenced in the input object and return updated input
680 object with 'location' updated to the proper keep references.
683 # Make a copy of the job order and set defaults.
684 builder_job_order = copy.copy(job_order)
686 # fill_in_defaults throws an error if there are any
687 # missing required parameters, we don't want it to do that
688 # so make them all optional.
689 inputs_copy = copy.deepcopy(tool.tool["inputs"])
690 for i in inputs_copy:
691 if "null" not in i["type"]:
692 i["type"] = ["null"] + aslist(i["type"])
694 fill_in_defaults(inputs_copy,
697 # Need to create a builder object to evaluate expressions.
698 builder = make_builder(builder_job_order,
703 # Now update job_order with secondaryFiles
704 discover_secondary_files(arvrunner.fs_access,
709 _jobloaderctx = jobloaderctx.copy()
710 jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
712 jobmapper = upload_dependencies(arvrunner,
716 job_order.get("id", "#"),
719 if "id" in job_order:
722 # Need to filter this out, gets added by cwltool when providing
723 # parameters on the command line.
724 if "job_order" in job_order:
725 del job_order["job_order"]
727 update_from_mapper(job_order, jobmapper)
729 return job_order, jobmapper
731 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
733 def upload_workflow_deps(arvrunner, tool, runtimeContext):
734 # Ensure that Docker images needed by this workflow are available
736 with Perf(metrics, "upload_docker"):
737 upload_docker(arvrunner, tool, runtimeContext)
739 document_loader = tool.doc_loader
746 # Standard traversal is top down, we want to go bottom up, so use
747 # the visitor to accumalate a list of nodes to visit, then
748 # visit them in reverse order.
749 def upload_tool_deps(deptool):
753 tool.visit(upload_tool_deps)
755 for deptool in reversed(todo):
756 discovered_secondaryfiles = {}
757 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
758 pm = upload_dependencies(arvrunner,
759 "%s dependencies" % (shortname(deptool["id"])),
764 include_primary=False,
765 discovered_secondaryfiles=discovered_secondaryfiles,
766 cache=tool_dep_cache)
768 document_loader.idx[deptool["id"]] = deptool
770 for k,v in pm.items():
771 toolmap[k] = v.resolved
773 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
777 def arvados_jobs_image(arvrunner, img, runtimeContext):
778 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
781 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
782 True, runtimeContext)
783 except Exception as e:
784 raise Exception("Docker image %s is not available\n%s" % (img, e) )
787 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
788 collection = arvados.collection.Collection(api_client=arvrunner.api,
789 keep_client=arvrunner.keep_client,
790 num_retries=arvrunner.num_retries)
791 with collection.open("workflow.cwl", "w") as f:
792 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
794 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
795 ["name", "like", name+"%"]]
796 if runtimeContext.project_uuid:
797 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
798 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
801 logger.info("Using collection %s", exists["items"][0]["uuid"])
803 collection.save_new(name=name,
804 owner_uuid=runtimeContext.project_uuid,
805 ensure_unique_name=True,
806 num_retries=arvrunner.num_retries)
807 logger.info("Uploaded to %s", collection.manifest_locator())
809 return collection.portable_data_hash()
812 class Runner(Process):
813 """Base class for runner processes, which submit an instance of
814 arvados-cwl-runner and wait for the final result."""
816 def __init__(self, runner,
817 tool, loadingContext, enable_reuse,
818 output_name, output_tags, submit_runner_ram=0,
819 name=None, on_error=None, submit_runner_image=None,
820 intermediate_output_ttl=0, merged_map=None,
821 priority=None, secret_store=None,
822 collection_cache_size=256,
823 collection_cache_is_default=True,
827 self.loadingContext = loadingContext.copy()
829 super(Runner, self).__init__(tool.tool, loadingContext)
831 self.arvrunner = runner
832 self.embedded_tool = tool
833 self.job_order = None
836 # If reuse is permitted by command line arguments but
837 # disabled by the workflow itself, disable it.
838 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
840 enable_reuse = reuse_req["enableReuse"]
841 reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
843 enable_reuse = reuse_req["enableReuse"]
844 self.enable_reuse = enable_reuse
846 self.final_output = None
847 self.output_name = output_name
848 self.output_tags = output_tags
850 self.on_error = on_error
851 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
852 self.intermediate_output_ttl = intermediate_output_ttl
853 self.priority = priority
854 self.secret_store = secret_store
855 self.enable_dev = self.loadingContext.enable_dev
856 self.git_info = git_info
857 self.fast_parser = self.loadingContext.fast_parser
858 self.reuse_runner = reuse_runner
860 self.submit_runner_cores = 1
861 self.submit_runner_ram = 1024 # defaut 1 GiB
862 self.collection_cache_size = collection_cache_size
864 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
865 if runner_resource_req:
866 if runner_resource_req.get("coresMin"):
867 self.submit_runner_cores = runner_resource_req["coresMin"]
868 if runner_resource_req.get("ramMin"):
869 self.submit_runner_ram = runner_resource_req["ramMin"]
870 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
871 self.collection_cache_size = runner_resource_req["keep_cache"]
873 if submit_runner_ram:
874 # Command line / initializer overrides default and/or spec from workflow
875 self.submit_runner_ram = submit_runner_ram
877 if self.submit_runner_ram <= 0:
878 raise Exception("Value of submit-runner-ram must be greater than zero")
880 if self.submit_runner_cores <= 0:
881 raise Exception("Value of submit-runner-cores must be greater than zero")
883 self.merged_map = merged_map or {}
886 job_order, # type: Mapping[Text, Text]
887 output_callbacks, # type: Callable[[Any, Any], Any]
888 runtimeContext # type: RuntimeContext
889 ): # type: (...) -> Generator[Any, None, None]
890 self.job_order = job_order
891 self._init_job(job_order, runtimeContext)
894 def update_pipeline_component(self, record):
897 def done(self, record):
898 """Base method for handling a completed runner."""
901 if record["state"] == "Complete":
902 if record.get("exit_code") is not None:
903 if record["exit_code"] == 33:
904 processStatus = "UnsupportedRequirement"
905 elif record["exit_code"] == 0:
906 processStatus = "success"
908 processStatus = "permanentFail"
910 processStatus = "success"
912 processStatus = "permanentFail"
916 if processStatus == "permanentFail":
917 logc = arvados.collection.CollectionReader(record["log"],
918 api_client=self.arvrunner.api,
919 keep_client=self.arvrunner.keep_client,
920 num_retries=self.arvrunner.num_retries)
921 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
922 include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
924 self.final_output = record["output"]
925 outc = arvados.collection.CollectionReader(self.final_output,
926 api_client=self.arvrunner.api,
927 keep_client=self.arvrunner.keep_client,
928 num_retries=self.arvrunner.num_retries)
929 if "cwl.output.json" in outc:
930 with outc.open("cwl.output.json", "rb") as f:
932 outputs = json.loads(f.read().decode())
933 def keepify(fileobj):
934 path = fileobj["location"]
935 if not path.startswith("keep:"):
936 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
937 adjustFileObjs(outputs, keepify)
938 adjustDirObjs(outputs, keepify)
940 logger.exception("[%s] While getting final output object", self.name)
941 self.arvrunner.output_callback({}, "permanentFail")
943 self.arvrunner.output_callback(outputs, processStatus)
946 def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool):
947 def collect_locators(obj):
948 loc = obj.get("location", "")
950 g = arvados.util.keepuri_pattern.match(loc)
954 if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj:
955 references.add(obj["acrContainerImage"])
957 if obj.get("class") == "DockerRequirement":
958 references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext))
960 sc_result = scandeps(tool["id"], tool,
962 set(("location", "id")),
963 None, urljoin=doc_loader.fetcher.urljoin,
966 visit_class(sc_result, ("File", "Directory"), collect_locators)
967 visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
970 def print_keep_deps(arvRunner, runtimeContext, merged_map, tool):
973 tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader))
975 for mm in merged_map:
976 for k, v in merged_map[mm].resolved.items():
977 g = arvados.util.keepuri_pattern.match(v)
981 json.dump(sorted(references), arvRunner.stdout)
982 print(file=arvRunner.stdout)