1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
9 from functools import partial
13 from collections import namedtuple
14 from io import StringIO
37 from schema_salad.sourceline import SourceLine, cmap
39 from cwltool.command_line_tool import CommandLineTool
40 import cwltool.workflow
41 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
42 shortname, Process, fill_in_defaults)
43 from cwltool.load_tool import fetch_document, jobloaderctx
44 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
45 from cwltool.builder import substitute
46 from cwltool.pack import pack
47 from cwltool.update import INTERNAL_VERSION
48 from cwltool.builder import Builder
49 import schema_salad.validate as validate
50 import schema_salad.ref_resolver
52 import arvados.collection
54 from .util import collectionUUID
55 from ruamel.yaml import YAML
56 from ruamel.yaml.comments import CommentedMap, CommentedSeq
58 import arvados_cwl.arvdocker
59 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
60 from ._version import __version__
62 from . context import ArvRuntimeContext
63 from .perf import Perf
65 basestring = (bytes, str)
66 logger = logging.getLogger('arvados.cwl-runner')
67 metrics = logging.getLogger('arvados.cwl-runner.metrics')
69 def trim_anonymous_location(obj):
70 """Remove 'location' field from File and Directory literals.
72 To make internal handling easier, literals are assigned a random id for
73 'location'. However, when writing the record back out, this can break
74 reproducibility. Since it is valid for literals not have a 'location'
79 if obj.get("location", "").startswith("_:"):
83 def remove_redundant_fields(obj):
84 for field in ("path", "nameext", "nameroot", "dirname"):
89 def find_defaults(d, op):
90 if isinstance(d, list):
93 elif isinstance(d, dict):
100 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
103 files=[], # type: List[Dict[Text, Text]]
104 bindings=[], # type: List[Dict[Text, Any]]
105 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
106 names=None, # type: Names
107 requirements=requirements, # type: List[Dict[Text, Any]]
108 hints=hints, # type: List[Dict[Text, Any]]
109 resources={}, # type: Dict[str, int]
110 mutation_manager=None, # type: Optional[MutationManager]
111 formatgraph=None, # type: Optional[Graph]
112 make_fs_access=None, # type: Type[StdFsAccess]
113 fs_access=None, # type: StdFsAccess
114 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
115 timeout=runtimeContext.eval_timeout, # type: float
116 debug=runtimeContext.debug, # type: bool
117 js_console=runtimeContext.js_console, # type: bool
118 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
119 loadListing="", # type: Text
120 outdir="", # type: Text
121 tmpdir="", # type: Text
122 stagedir="", # type: Text
123 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
124 container_engine="docker"
127 def search_schemadef(name, reqs):
129 if r["class"] == "SchemaDefRequirement":
130 for sd in r["types"]:
131 if sd["name"] == name:
135 primitive_types_set = frozenset(("null", "boolean", "int", "long",
136 "float", "double", "string", "record",
139 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
140 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
141 # union type, collect all possible secondaryFiles
142 for i in inputschema:
143 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
146 if inputschema == "File":
147 inputschema = {"type": "File"}
149 if isinstance(inputschema, basestring):
150 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
156 if "secondaryFiles" in inputschema:
157 # set secondaryFiles, may be inherited by compound types.
158 secondaryspec = inputschema["secondaryFiles"]
160 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
161 not isinstance(inputschema["type"], basestring)):
162 # compound type (union, array, record)
163 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
165 elif (inputschema["type"] == "record" and
166 isinstance(primary, Mapping)):
168 # record type, find secondary files associated with fields.
170 for f in inputschema["fields"]:
171 p = primary.get(shortname(f["name"]))
173 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
175 elif (inputschema["type"] == "array" and
176 isinstance(primary, Sequence)):
178 # array type, find secondary files of elements
181 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
183 elif (inputschema["type"] == "File" and
184 isinstance(primary, Mapping) and
185 primary.get("class") == "File"):
187 if "secondaryFiles" in primary or not secondaryspec:
192 # Found a file, check for secondaryFiles
195 primary["secondaryFiles"] = secondaryspec
196 for i, sf in enumerate(aslist(secondaryspec)):
197 if builder.cwlVersion == "v1.0":
200 pattern = sf["pattern"]
203 if isinstance(pattern, list):
204 specs.extend(pattern)
205 elif isinstance(pattern, dict):
206 specs.append(pattern)
207 elif isinstance(pattern, str):
208 if builder.cwlVersion == "v1.0":
209 specs.append({"pattern": pattern, "required": True})
211 specs.append({"pattern": pattern, "required": sf.get("required")})
213 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214 "Expression must return list, object, string or null")
217 for i, sf in enumerate(specs):
218 if isinstance(sf, dict):
219 if sf.get("class") == "File":
221 if sf.get("location") is None:
222 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
223 "File object is missing 'location': %s" % sf)
224 sfpath = sf["location"]
227 pattern = sf["pattern"]
228 required = sf.get("required")
229 elif isinstance(sf, str):
233 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
234 "Expression must return list, object, string or null")
236 if pattern is not None:
237 if "${" in pattern or "$(" in pattern:
238 sfname = builder.do_eval(pattern, context=primary)
240 sfname = substitute(primary["basename"], pattern)
245 if isinstance(sfname, str):
246 p_location = primary["location"]
247 if "/" in p_location:
249 p_location[0 : p_location.rindex("/") + 1]
253 required = builder.do_eval(required, context=primary)
255 if isinstance(sfname, list) or isinstance(sfname, dict):
256 each = aslist(sfname)
258 if required and not fsaccess.exists(e.get("location")):
259 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
260 "Required secondary file '%s' does not exist" % e.get("location"))
263 if isinstance(sfname, str):
264 if fsaccess.exists(sfpath):
265 if pattern is not None:
266 found.append({"location": sfpath, "class": "File"})
270 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
271 "Required secondary file '%s' does not exist" % sfpath)
273 primary["secondaryFiles"] = cmap(found)
274 if discovered is not None:
275 discovered[primary["location"]] = primary["secondaryFiles"]
276 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
277 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
279 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
280 for inputschema in inputs:
281 primary = job_order.get(shortname(inputschema["id"]))
282 if isinstance(primary, (Mapping, Sequence)):
283 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
285 def upload_dependencies(arvrunner, name, document_loader,
286 workflowobj, uri, runtimeContext,
287 include_primary=True, discovered_secondaryfiles=None,
289 """Upload the dependencies of the workflowobj document to Keep.
291 Returns a pathmapper object mapping local paths to keep references. Also
292 does an in-place update of references in "workflowobj".
294 Use scandeps to find $schemas, File and Directory
295 fields that represent external references.
297 If workflowobj has an "id" field, this will reload the document to ensure
298 it is scanning the raw document prior to preprocessing.
301 scanobj = workflowobj
304 with Perf(metrics, "scandeps"):
305 sc_result = scandeps(uri, scanobj,
308 None, urljoin=document_loader.fetcher.urljoin,
310 optional_deps = scandeps(uri, scanobj,
313 None, urljoin=document_loader.fetcher.urljoin,
316 if sc_result is None:
319 if optional_deps is None:
323 sc_result.extend(optional_deps)
328 def collect_uuids(obj):
329 loc = obj.get("location", "")
332 # Collect collection uuids that need to be resolved to
333 # portable data hashes
334 gp = collection_uuid_pattern.match(loc)
336 uuids[gp.groups()[0]] = obj
337 if collectionUUID in obj:
338 uuids[obj[collectionUUID]] = obj
340 def collect_uploads(obj):
341 loc = obj.get("location", "")
345 if sp[0] in ("file", "http", "https"):
346 # Record local files than need to be uploaded,
347 # don't include file literals, keep references, etc.
351 with Perf(metrics, "collect uuids"):
352 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
354 with Perf(metrics, "collect uploads"):
355 visit_class(sc_result, ("File", "Directory"), collect_uploads)
357 # Resolve any collection uuids we found to portable data hashes
358 # and assign them to uuid_map
360 fetch_uuids = list(uuids.keys())
361 with Perf(metrics, "fetch_uuids"):
363 # For a large number of fetch_uuids, API server may limit
364 # response size, so keep fetching from API server has nothing
366 lookups = arvrunner.api.collections().list(
367 filters=[["uuid", "in", fetch_uuids]],
369 select=["uuid", "portable_data_hash"]).execute(
370 num_retries=arvrunner.num_retries)
372 if not lookups["items"]:
375 for l in lookups["items"]:
376 uuid_map[l["uuid"]] = l["portable_data_hash"]
378 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
380 normalizeFilesDirs(sc)
382 if "id" in workflowobj:
383 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
385 # make sure it's included
386 sc.append({"class": "File", "location": defrg})
388 # make sure it's excluded
389 sc = [d for d in sc if d.get("location") != defrg]
391 def visit_default(obj):
392 def defaults_are_optional(f):
393 if "location" not in f and "path" in f:
394 f["location"] = f["path"]
396 normalizeFilesDirs(f)
397 optional_deps.append(f)
398 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
400 find_defaults(workflowobj, visit_default)
403 def discover_default_secondary_files(obj):
404 builder_job_order = {}
405 for t in obj["inputs"]:
406 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
407 # Need to create a builder object to evaluate expressions.
408 builder = make_builder(builder_job_order,
409 obj.get("hints", []),
410 obj.get("requirements", []),
413 discover_secondary_files(arvrunner.fs_access,
419 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
420 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
422 for d in list(discovered):
423 # Only interested in discovered secondaryFiles which are local
424 # files that need to be uploaded.
425 if d.startswith("file:"):
426 sc.extend(discovered[d])
430 with Perf(metrics, "mapper"):
431 mapper = ArvPathMapper(arvrunner, sc, "",
435 single_collection=True,
436 optional_deps=optional_deps)
438 for k, v in uuid_map.items():
439 mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
443 if k.startswith("keep:"):
444 keeprefs.add(collection_pdh_pattern.match(k).group(1))
448 loc = p.get("location")
449 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
450 addkeepref(p["location"])
456 if collectionUUID in p:
457 uuid = p[collectionUUID]
458 if uuid not in uuid_map:
459 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
460 "Collection uuid %s not found" % uuid)
461 gp = collection_pdh_pattern.match(loc)
462 if gp and uuid_map[uuid] != gp.groups()[0]:
463 # This file entry has both collectionUUID and a PDH
464 # location. If the PDH doesn't match the one returned
465 # the API server, raise an error.
466 raise SourceLine(p, "location", validate.ValidationException).makeError(
467 "Expected collection uuid %s to be %s but API server reported %s" % (
468 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
470 gp = collection_uuid_pattern.match(loc)
472 # Not a uuid pattern (must be a pdh pattern)
473 addkeepref(p["location"])
476 uuid = gp.groups()[0]
477 if uuid not in uuid_map:
478 raise SourceLine(p, "location", validate.ValidationException).makeError(
479 "Collection uuid %s not found" % uuid)
481 with Perf(metrics, "collectloc"):
482 visit_class(workflowobj, ("File", "Directory"), collectloc)
483 visit_class(discovered, ("File", "Directory"), collectloc)
485 if discovered_secondaryfiles is not None:
487 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
489 if runtimeContext.copy_deps:
490 # Find referenced collections and copy them into the
491 # destination project, for easy sharing.
492 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
493 filters=[["portable_data_hash", "in", list(keeprefs)],
494 ["owner_uuid", "=", runtimeContext.project_uuid]],
495 select=["uuid", "portable_data_hash", "created_at"]))
497 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
499 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
500 order="created_at desc",
501 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
503 if len(col["items"]) == 0:
504 logger.warning("Cannot find collection with portable data hash %s", kr)
506 col = col["items"][0]
507 col["name"] = arvados.util.trim_name(col["name"])
509 arvrunner.api.collections().create(body={"collection": {
510 "owner_uuid": runtimeContext.project_uuid,
512 "description": col["description"],
513 "properties": col["properties"],
514 "portable_data_hash": col["portable_data_hash"],
515 "manifest_text": col["manifest_text"],
516 "storage_classes_desired": col["storage_classes_desired"],
517 "trash_at": col["trash_at"]
518 }}, ensure_unique_name=True).execute()
519 except Exception as e:
520 logger.warning("Unable to copy collection to destination: %s", e)
522 if "$schemas" in workflowobj:
524 for s in workflowobj["$schemas"]:
526 sch.append(mapper.mapper(s).resolved)
527 workflowobj["$schemas"] = sch
532 def upload_docker(arvrunner, tool, runtimeContext):
533 """Uploads Docker images used in CommandLineTool objects."""
535 if isinstance(tool, CommandLineTool):
536 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
538 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
539 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
540 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
542 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
544 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
545 True, runtimeContext)
546 elif isinstance(tool, cwltool.workflow.Workflow):
548 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
551 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
552 """Create a packed workflow.
554 A "packed" workflow is one where all the components have been combined into a single document."""
557 packed = pack(arvrunner.loadingContext, tool.tool["id"],
558 rewrite_out=rewrites,
559 loader=tool.doc_loader)
561 rewrite_to_orig = {v: k for k,v in rewrites.items()}
563 def visit(v, cur_id):
564 if isinstance(v, dict):
565 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
566 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
567 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
569 cur_id = rewrite_to_orig.get(v["id"], v["id"])
570 if "path" in v and "location" not in v:
571 v["location"] = v["path"]
573 if "location" in v and cur_id in merged_map:
574 if v["location"] in merged_map[cur_id].resolved:
575 v["location"] = merged_map[cur_id].resolved[v["location"]]
576 if v["location"] in merged_map[cur_id].secondaryFiles:
577 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
578 if v.get("class") == "DockerRequirement":
579 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
583 if isinstance(v, list):
590 packed[g] = git_info[g]
595 def tag_git_version(packed):
596 if tool.tool["id"].startswith("file://"):
597 path = os.path.dirname(tool.tool["id"][7:])
599 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
600 except (OSError, subprocess.CalledProcessError):
603 packed["http://schema.org/version"] = githash
605 def setloc(mapper, p):
606 loc = p.get("location")
607 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
608 p["location"] = mapper.mapper(p["location"]).resolved
614 if collectionUUID in p:
615 uuid = p[collectionUUID]
616 keepuuid = "keep:"+uuid
617 if keepuuid not in mapper:
618 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
619 "Collection uuid %s not found" % uuid)
620 gp = collection_pdh_pattern.match(loc)
621 if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
622 # This file entry has both collectionUUID and a PDH
623 # location. If the PDH doesn't match the one returned
624 # the API server, raise an error.
625 raise SourceLine(p, "location", validate.ValidationException).makeError(
626 "Expected collection uuid %s to be %s but API server reported %s" % (
627 uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
629 gp = collection_uuid_pattern.match(loc)
631 # Not a uuid pattern (must be a pdh pattern)
634 uuid = gp.groups()[0]
635 keepuuid = "keep:"+uuid
636 if keepuuid not in mapper:
637 raise SourceLine(p, "location", validate.ValidationException).makeError(
638 "Collection uuid %s not found" % uuid)
639 p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
640 p[collectionUUID] = uuid
642 def update_from_mapper(workflowobj, mapper):
643 with Perf(metrics, "setloc"):
644 visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
646 def apply_merged_map(merged_map, workflowobj):
647 def visit(v, cur_id):
648 if isinstance(v, dict):
649 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
652 if "path" in v and "location" not in v:
653 v["location"] = v["path"]
655 if "location" in v and cur_id in merged_map:
656 if v["location"] in merged_map[cur_id].resolved:
657 v["location"] = merged_map[cur_id].resolved[v["location"]]
658 if v["location"] in merged_map[cur_id].secondaryFiles:
659 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
660 #if v.get("class") == "DockerRequirement":
661 # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
665 if isinstance(v, list):
668 visit(workflowobj, None)
670 def update_from_merged_map(tool, merged_map):
671 tool.visit(partial(apply_merged_map, merged_map))
673 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
674 """Upload local files referenced in the input object and return updated input
675 object with 'location' updated to the proper keep references.
678 # Make a copy of the job order and set defaults.
679 builder_job_order = copy.copy(job_order)
681 # fill_in_defaults throws an error if there are any
682 # missing required parameters, we don't want it to do that
683 # so make them all optional.
684 inputs_copy = copy.deepcopy(tool.tool["inputs"])
685 for i in inputs_copy:
686 if "null" not in i["type"]:
687 i["type"] = ["null"] + aslist(i["type"])
689 fill_in_defaults(inputs_copy,
692 # Need to create a builder object to evaluate expressions.
693 builder = make_builder(builder_job_order,
698 # Now update job_order with secondaryFiles
699 discover_secondary_files(arvrunner.fs_access,
704 _jobloaderctx = jobloaderctx.copy()
705 jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
707 jobmapper = upload_dependencies(arvrunner,
711 job_order.get("id", "#"),
714 if "id" in job_order:
717 # Need to filter this out, gets added by cwltool when providing
718 # parameters on the command line.
719 if "job_order" in job_order:
720 del job_order["job_order"]
722 update_from_mapper(job_order, jobmapper)
724 return job_order, jobmapper
726 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
728 def upload_workflow_deps(arvrunner, tool, runtimeContext):
729 # Ensure that Docker images needed by this workflow are available
731 with Perf(metrics, "upload_docker"):
732 upload_docker(arvrunner, tool, runtimeContext)
734 document_loader = tool.doc_loader
741 # Standard traversal is top down, we want to go bottom up, so use
742 # the visitor to accumalate a list of nodes to visit, then
743 # visit them in reverse order.
744 def upload_tool_deps(deptool):
748 tool.visit(upload_tool_deps)
750 for deptool in reversed(todo):
751 discovered_secondaryfiles = {}
752 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
753 pm = upload_dependencies(arvrunner,
754 "%s dependencies" % (shortname(deptool["id"])),
759 include_primary=False,
760 discovered_secondaryfiles=discovered_secondaryfiles,
761 cache=tool_dep_cache)
763 document_loader.idx[deptool["id"]] = deptool
765 for k,v in pm.items():
766 toolmap[k] = v.resolved
768 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
772 def arvados_jobs_image(arvrunner, img, runtimeContext):
773 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
776 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
777 True, runtimeContext)
778 except Exception as e:
779 raise Exception("Docker image %s is not available\n%s" % (img, e) )
782 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
783 collection = arvados.collection.Collection(api_client=arvrunner.api,
784 keep_client=arvrunner.keep_client,
785 num_retries=arvrunner.num_retries)
786 with collection.open("workflow.cwl", "w") as f:
787 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
789 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
790 ["name", "like", name+"%"]]
791 if runtimeContext.project_uuid:
792 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
793 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
796 logger.info("Using collection %s", exists["items"][0]["uuid"])
798 collection.save_new(name=name,
799 owner_uuid=runtimeContext.project_uuid,
800 ensure_unique_name=True,
801 num_retries=arvrunner.num_retries)
802 logger.info("Uploaded to %s", collection.manifest_locator())
804 return collection.portable_data_hash()
807 class Runner(Process):
808 """Base class for runner processes, which submit an instance of
809 arvados-cwl-runner and wait for the final result."""
811 def __init__(self, runner,
812 tool, loadingContext, enable_reuse,
813 output_name, output_tags, submit_runner_ram=0,
814 name=None, on_error=None, submit_runner_image=None,
815 intermediate_output_ttl=0, merged_map=None,
816 priority=None, secret_store=None,
817 collection_cache_size=256,
818 collection_cache_is_default=True,
822 self.loadingContext = loadingContext.copy()
824 super(Runner, self).__init__(tool.tool, loadingContext)
826 self.arvrunner = runner
827 self.embedded_tool = tool
828 self.job_order = None
831 # If reuse is permitted by command line arguments but
832 # disabled by the workflow itself, disable it.
833 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
835 enable_reuse = reuse_req["enableReuse"]
836 reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
838 enable_reuse = reuse_req["enableReuse"]
839 self.enable_reuse = enable_reuse
841 self.final_output = None
842 self.output_name = output_name
843 self.output_tags = output_tags
845 self.on_error = on_error
846 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
847 self.intermediate_output_ttl = intermediate_output_ttl
848 self.priority = priority
849 self.secret_store = secret_store
850 self.enable_dev = self.loadingContext.enable_dev
851 self.git_info = git_info
852 self.fast_parser = self.loadingContext.fast_parser
853 self.reuse_runner = reuse_runner
855 self.submit_runner_cores = 1
856 self.submit_runner_ram = 1024 # defaut 1 GiB
857 self.collection_cache_size = collection_cache_size
859 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
860 if runner_resource_req:
861 if runner_resource_req.get("coresMin"):
862 self.submit_runner_cores = runner_resource_req["coresMin"]
863 if runner_resource_req.get("ramMin"):
864 self.submit_runner_ram = runner_resource_req["ramMin"]
865 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
866 self.collection_cache_size = runner_resource_req["keep_cache"]
868 if submit_runner_ram:
869 # Command line / initializer overrides default and/or spec from workflow
870 self.submit_runner_ram = submit_runner_ram
872 if self.submit_runner_ram <= 0:
873 raise Exception("Value of submit-runner-ram must be greater than zero")
875 if self.submit_runner_cores <= 0:
876 raise Exception("Value of submit-runner-cores must be greater than zero")
878 self.merged_map = merged_map or {}
881 job_order, # type: Mapping[Text, Text]
882 output_callbacks, # type: Callable[[Any, Any], Any]
883 runtimeContext # type: RuntimeContext
884 ): # type: (...) -> Generator[Any, None, None]
885 self.job_order = job_order
886 self._init_job(job_order, runtimeContext)
889 def update_pipeline_component(self, record):
892 def done(self, record):
893 """Base method for handling a completed runner."""
896 if record["state"] == "Complete":
897 if record.get("exit_code") is not None:
898 if record["exit_code"] == 33:
899 processStatus = "UnsupportedRequirement"
900 elif record["exit_code"] == 0:
901 processStatus = "success"
903 processStatus = "permanentFail"
905 processStatus = "success"
907 processStatus = "permanentFail"
911 if processStatus == "permanentFail":
912 logc = arvados.collection.CollectionReader(record["log"],
913 api_client=self.arvrunner.api,
914 keep_client=self.arvrunner.keep_client,
915 num_retries=self.arvrunner.num_retries)
916 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
917 include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
919 self.final_output = record["output"]
920 outc = arvados.collection.CollectionReader(self.final_output,
921 api_client=self.arvrunner.api,
922 keep_client=self.arvrunner.keep_client,
923 num_retries=self.arvrunner.num_retries)
924 if "cwl.output.json" in outc:
925 with outc.open("cwl.output.json", "rb") as f:
927 outputs = json.loads(str(f.read(), 'utf-8'))
928 def keepify(fileobj):
929 path = fileobj["location"]
930 if not path.startswith("keep:"):
931 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
932 adjustFileObjs(outputs, keepify)
933 adjustDirObjs(outputs, keepify)
935 logger.exception("[%s] While getting final output object", self.name)
936 self.arvrunner.output_callback({}, "permanentFail")
938 self.arvrunner.output_callback(outputs, processStatus)
941 def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool):
942 def collect_locators(obj):
943 loc = obj.get("location", "")
945 g = arvados.util.keepuri_pattern.match(loc)
949 if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj:
950 references.add(obj["acrContainerImage"])
952 if obj.get("class") == "DockerRequirement":
953 references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext))
955 sc_result = scandeps(tool["id"], tool,
957 set(("location", "id")),
958 None, urljoin=doc_loader.fetcher.urljoin,
961 visit_class(sc_result, ("File", "Directory"), collect_locators)
962 visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
965 def print_keep_deps(arvRunner, runtimeContext, merged_map, tool):
968 tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader))
970 for mm in merged_map:
971 for k, v in merged_map[mm].resolved.items():
972 g = arvados.util.keepuri_pattern.match(v)
976 json.dump(sorted(references), arvRunner.stdout)
977 print(file=arvRunner.stdout)