1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
47 from schema_salad.sourceline import SourceLine, cmap
49 from cwltool.command_line_tool import CommandLineTool
50 import cwltool.workflow
51 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
52 shortname, Process, fill_in_defaults)
53 from cwltool.load_tool import fetch_document, jobloaderctx
54 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
55 from cwltool.builder import substitute
56 from cwltool.pack import pack
57 from cwltool.update import INTERNAL_VERSION
58 from cwltool.builder import Builder
59 import schema_salad.validate as validate
60 import schema_salad.ref_resolver
62 import arvados.collection
64 from .util import collectionUUID
65 from ruamel.yaml import YAML
66 from ruamel.yaml.comments import CommentedMap, CommentedSeq
68 import arvados_cwl.arvdocker
69 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
70 from ._version import __version__
72 from . context import ArvRuntimeContext
73 from .perf import Perf
75 logger = logging.getLogger('arvados.cwl-runner')
76 metrics = logging.getLogger('arvados.cwl-runner.metrics')
78 def trim_anonymous_location(obj):
79 """Remove 'location' field from File and Directory literals.
81 To make internal handling easier, literals are assigned a random id for
82 'location'. However, when writing the record back out, this can break
83 reproducibility. Since it is valid for literals not have a 'location'
88 if obj.get("location", "").startswith("_:"):
92 def remove_redundant_fields(obj):
93 for field in ("path", "nameext", "nameroot", "dirname"):
98 def find_defaults(d, op):
99 if isinstance(d, list):
102 elif isinstance(d, dict):
106 for i in viewvalues(d):
109 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
112 files=[], # type: List[Dict[Text, Text]]
113 bindings=[], # type: List[Dict[Text, Any]]
114 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
115 names=None, # type: Names
116 requirements=requirements, # type: List[Dict[Text, Any]]
117 hints=hints, # type: List[Dict[Text, Any]]
118 resources={}, # type: Dict[str, int]
119 mutation_manager=None, # type: Optional[MutationManager]
120 formatgraph=None, # type: Optional[Graph]
121 make_fs_access=None, # type: Type[StdFsAccess]
122 fs_access=None, # type: StdFsAccess
123 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
124 timeout=runtimeContext.eval_timeout, # type: float
125 debug=runtimeContext.debug, # type: bool
126 js_console=runtimeContext.js_console, # type: bool
127 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
128 loadListing="", # type: Text
129 outdir="", # type: Text
130 tmpdir="", # type: Text
131 stagedir="", # type: Text
132 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
133 container_engine="docker"
136 def search_schemadef(name, reqs):
138 if r["class"] == "SchemaDefRequirement":
139 for sd in r["types"]:
140 if sd["name"] == name:
144 primitive_types_set = frozenset(("null", "boolean", "int", "long",
145 "float", "double", "string", "record",
148 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
149 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
150 # union type, collect all possible secondaryFiles
151 for i in inputschema:
152 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
155 if inputschema == "File":
156 inputschema = {"type": "File"}
158 if isinstance(inputschema, basestring):
159 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
165 if "secondaryFiles" in inputschema:
166 # set secondaryFiles, may be inherited by compound types.
167 secondaryspec = inputschema["secondaryFiles"]
169 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
170 not isinstance(inputschema["type"], basestring)):
171 # compound type (union, array, record)
172 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
174 elif (inputschema["type"] == "record" and
175 isinstance(primary, Mapping)):
177 # record type, find secondary files associated with fields.
179 for f in inputschema["fields"]:
180 p = primary.get(shortname(f["name"]))
182 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
184 elif (inputschema["type"] == "array" and
185 isinstance(primary, Sequence)):
187 # array type, find secondary files of elements
190 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
192 elif (inputschema["type"] == "File" and
193 isinstance(primary, Mapping) and
194 primary.get("class") == "File"):
196 if "secondaryFiles" in primary or not secondaryspec:
201 # Found a file, check for secondaryFiles
204 primary["secondaryFiles"] = secondaryspec
205 for i, sf in enumerate(aslist(secondaryspec)):
206 if builder.cwlVersion == "v1.0":
209 pattern = sf["pattern"]
212 if isinstance(pattern, list):
213 specs.extend(pattern)
214 elif isinstance(pattern, dict):
215 specs.append(pattern)
216 elif isinstance(pattern, str):
217 if builder.cwlVersion == "v1.0":
218 specs.append({"pattern": pattern, "required": True})
220 specs.append({"pattern": pattern, "required": sf.get("required")})
222 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
223 "Expression must return list, object, string or null")
226 for i, sf in enumerate(specs):
227 if isinstance(sf, dict):
228 if sf.get("class") == "File":
230 if sf.get("location") is None:
231 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
232 "File object is missing 'location': %s" % sf)
233 sfpath = sf["location"]
236 pattern = sf["pattern"]
237 required = sf.get("required")
238 elif isinstance(sf, str):
242 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
243 "Expression must return list, object, string or null")
245 if pattern is not None:
246 if "${" in pattern or "$(" in pattern:
247 sfname = builder.do_eval(pattern, context=primary)
249 sfname = substitute(primary["basename"], pattern)
254 if isinstance(sfname, str):
255 p_location = primary["location"]
256 if "/" in p_location:
258 p_location[0 : p_location.rindex("/") + 1]
262 required = builder.do_eval(required, context=primary)
264 if isinstance(sfname, list) or isinstance(sfname, dict):
265 each = aslist(sfname)
267 if required and not fsaccess.exists(e.get("location")):
268 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
269 "Required secondary file '%s' does not exist" % e.get("location"))
272 if isinstance(sfname, str):
273 if fsaccess.exists(sfpath):
274 if pattern is not None:
275 found.append({"location": sfpath, "class": "File"})
279 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
280 "Required secondary file '%s' does not exist" % sfpath)
282 primary["secondaryFiles"] = cmap(found)
283 if discovered is not None:
284 discovered[primary["location"]] = primary["secondaryFiles"]
285 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
286 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
288 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
289 for inputschema in inputs:
290 primary = job_order.get(shortname(inputschema["id"]))
291 if isinstance(primary, (Mapping, Sequence)):
292 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
294 def upload_dependencies(arvrunner, name, document_loader,
295 workflowobj, uri, runtimeContext,
296 include_primary=True, discovered_secondaryfiles=None,
298 """Upload the dependencies of the workflowobj document to Keep.
300 Returns a pathmapper object mapping local paths to keep references. Also
301 does an in-place update of references in "workflowobj".
303 Use scandeps to find $schemas, File and Directory
304 fields that represent external references.
306 If workflowobj has an "id" field, this will reload the document to ensure
307 it is scanning the raw document prior to preprocessing.
310 scanobj = workflowobj
313 with Perf(metrics, "scandeps"):
314 sc_result = scandeps(uri, scanobj,
317 None, urljoin=document_loader.fetcher.urljoin,
319 optional_deps = scandeps(uri, scanobj,
322 None, urljoin=document_loader.fetcher.urljoin,
325 if sc_result is None:
328 if optional_deps is None:
332 sc_result.extend(optional_deps)
337 def collect_uuids(obj):
338 loc = obj.get("location", "")
341 # Collect collection uuids that need to be resolved to
342 # portable data hashes
343 gp = collection_uuid_pattern.match(loc)
345 uuids[gp.groups()[0]] = obj
346 if collectionUUID in obj:
347 uuids[obj[collectionUUID]] = obj
349 def collect_uploads(obj):
350 loc = obj.get("location", "")
354 if sp[0] in ("file", "http", "https"):
355 # Record local files than need to be uploaded,
356 # don't include file literals, keep references, etc.
360 with Perf(metrics, "collect uuids"):
361 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
363 with Perf(metrics, "collect uploads"):
364 visit_class(sc_result, ("File", "Directory"), collect_uploads)
366 # Resolve any collection uuids we found to portable data hashes
367 # and assign them to uuid_map
369 fetch_uuids = list(uuids.keys())
370 with Perf(metrics, "fetch_uuids"):
372 # For a large number of fetch_uuids, API server may limit
373 # response size, so keep fetching from API server has nothing
375 lookups = arvrunner.api.collections().list(
376 filters=[["uuid", "in", fetch_uuids]],
378 select=["uuid", "portable_data_hash"]).execute(
379 num_retries=arvrunner.num_retries)
381 if not lookups["items"]:
384 for l in lookups["items"]:
385 uuid_map[l["uuid"]] = l["portable_data_hash"]
387 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
389 normalizeFilesDirs(sc)
391 if "id" in workflowobj:
392 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
394 # make sure it's included
395 sc.append({"class": "File", "location": defrg})
397 # make sure it's excluded
398 sc = [d for d in sc if d.get("location") != defrg]
400 def visit_default(obj):
401 def defaults_are_optional(f):
402 if "location" not in f and "path" in f:
403 f["location"] = f["path"]
405 normalizeFilesDirs(f)
406 optional_deps.append(f)
407 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
409 find_defaults(workflowobj, visit_default)
412 def discover_default_secondary_files(obj):
413 builder_job_order = {}
414 for t in obj["inputs"]:
415 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
416 # Need to create a builder object to evaluate expressions.
417 builder = make_builder(builder_job_order,
418 obj.get("hints", []),
419 obj.get("requirements", []),
422 discover_secondary_files(arvrunner.fs_access,
428 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
429 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
431 for d in list(discovered):
432 # Only interested in discovered secondaryFiles which are local
433 # files that need to be uploaded.
434 if d.startswith("file:"):
435 sc.extend(discovered[d])
439 with Perf(metrics, "mapper"):
440 mapper = ArvPathMapper(arvrunner, sc, "",
444 single_collection=True,
445 optional_deps=optional_deps)
447 for k, v in uuid_map.items():
448 mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
452 if k.startswith("keep:"):
453 keeprefs.add(collection_pdh_pattern.match(k).group(1))
457 loc = p.get("location")
458 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
459 addkeepref(p["location"])
465 if collectionUUID in p:
466 uuid = p[collectionUUID]
467 if uuid not in uuid_map:
468 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
469 "Collection uuid %s not found" % uuid)
470 gp = collection_pdh_pattern.match(loc)
471 if gp and uuid_map[uuid] != gp.groups()[0]:
472 # This file entry has both collectionUUID and a PDH
473 # location. If the PDH doesn't match the one returned
474 # the API server, raise an error.
475 raise SourceLine(p, "location", validate.ValidationException).makeError(
476 "Expected collection uuid %s to be %s but API server reported %s" % (
477 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
479 gp = collection_uuid_pattern.match(loc)
481 # Not a uuid pattern (must be a pdh pattern)
482 addkeepref(p["location"])
485 uuid = gp.groups()[0]
486 if uuid not in uuid_map:
487 raise SourceLine(p, "location", validate.ValidationException).makeError(
488 "Collection uuid %s not found" % uuid)
490 with Perf(metrics, "collectloc"):
491 visit_class(workflowobj, ("File", "Directory"), collectloc)
492 visit_class(discovered, ("File", "Directory"), collectloc)
494 if discovered_secondaryfiles is not None:
496 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
498 if runtimeContext.copy_deps:
499 # Find referenced collections and copy them into the
500 # destination project, for easy sharing.
501 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
502 filters=[["portable_data_hash", "in", list(keeprefs)],
503 ["owner_uuid", "=", runtimeContext.project_uuid]],
504 select=["uuid", "portable_data_hash", "created_at"]))
506 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
508 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
509 order="created_at desc",
510 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
512 if len(col["items"]) == 0:
513 logger.warning("Cannot find collection with portable data hash %s", kr)
515 col = col["items"][0]
516 col["name"] = arvados.util.trim_name(col["name"])
518 arvrunner.api.collections().create(body={"collection": {
519 "owner_uuid": runtimeContext.project_uuid,
521 "description": col["description"],
522 "properties": col["properties"],
523 "portable_data_hash": col["portable_data_hash"],
524 "manifest_text": col["manifest_text"],
525 "storage_classes_desired": col["storage_classes_desired"],
526 "trash_at": col["trash_at"]
527 }}, ensure_unique_name=True).execute()
528 except Exception as e:
529 logger.warning("Unable to copy collection to destination: %s", e)
531 if "$schemas" in workflowobj:
533 for s in workflowobj["$schemas"]:
535 sch.append(mapper.mapper(s).resolved)
536 workflowobj["$schemas"] = sch
541 def upload_docker(arvrunner, tool, runtimeContext):
542 """Uploads Docker images used in CommandLineTool objects."""
544 if isinstance(tool, CommandLineTool):
545 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
547 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
548 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
549 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
551 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
553 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
554 True, runtimeContext)
555 elif isinstance(tool, cwltool.workflow.Workflow):
557 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
560 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
561 """Create a packed workflow.
563 A "packed" workflow is one where all the components have been combined into a single document."""
566 packed = pack(arvrunner.loadingContext, tool.tool["id"],
567 rewrite_out=rewrites,
568 loader=tool.doc_loader)
570 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
572 def visit(v, cur_id):
573 if isinstance(v, dict):
574 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
575 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
576 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
578 cur_id = rewrite_to_orig.get(v["id"], v["id"])
579 if "path" in v and "location" not in v:
580 v["location"] = v["path"]
582 if "location" in v and cur_id in merged_map:
583 if v["location"] in merged_map[cur_id].resolved:
584 v["location"] = merged_map[cur_id].resolved[v["location"]]
585 if v["location"] in merged_map[cur_id].secondaryFiles:
586 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
587 if v.get("class") == "DockerRequirement":
588 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
592 if isinstance(v, list):
599 packed[g] = git_info[g]
604 def tag_git_version(packed):
605 if tool.tool["id"].startswith("file://"):
606 path = os.path.dirname(tool.tool["id"][7:])
608 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
609 except (OSError, subprocess.CalledProcessError):
612 packed["http://schema.org/version"] = githash
614 def setloc(mapper, p):
615 loc = p.get("location")
616 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
617 p["location"] = mapper.mapper(p["location"]).resolved
623 if collectionUUID in p:
624 uuid = p[collectionUUID]
625 keepuuid = "keep:"+uuid
626 if keepuuid not in mapper:
627 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
628 "Collection uuid %s not found" % uuid)
629 gp = collection_pdh_pattern.match(loc)
630 if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
631 # This file entry has both collectionUUID and a PDH
632 # location. If the PDH doesn't match the one returned
633 # the API server, raise an error.
634 raise SourceLine(p, "location", validate.ValidationException).makeError(
635 "Expected collection uuid %s to be %s but API server reported %s" % (
636 uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
638 gp = collection_uuid_pattern.match(loc)
640 # Not a uuid pattern (must be a pdh pattern)
643 uuid = gp.groups()[0]
644 keepuuid = "keep:"+uuid
645 if keepuuid not in mapper:
646 raise SourceLine(p, "location", validate.ValidationException).makeError(
647 "Collection uuid %s not found" % uuid)
648 p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
649 p[collectionUUID] = uuid
651 def update_from_mapper(workflowobj, mapper):
652 with Perf(metrics, "setloc"):
653 visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
655 def apply_merged_map(merged_map, workflowobj):
656 def visit(v, cur_id):
657 if isinstance(v, dict):
658 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
661 if "path" in v and "location" not in v:
662 v["location"] = v["path"]
664 if "location" in v and cur_id in merged_map:
665 if v["location"] in merged_map[cur_id].resolved:
666 v["location"] = merged_map[cur_id].resolved[v["location"]]
667 if v["location"] in merged_map[cur_id].secondaryFiles:
668 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
669 #if v.get("class") == "DockerRequirement":
670 # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
674 if isinstance(v, list):
677 visit(workflowobj, None)
679 def update_from_merged_map(tool, merged_map):
680 tool.visit(partial(apply_merged_map, merged_map))
682 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
683 """Upload local files referenced in the input object and return updated input
684 object with 'location' updated to the proper keep references.
687 # Make a copy of the job order and set defaults.
688 builder_job_order = copy.copy(job_order)
690 # fill_in_defaults throws an error if there are any
691 # missing required parameters, we don't want it to do that
692 # so make them all optional.
693 inputs_copy = copy.deepcopy(tool.tool["inputs"])
694 for i in inputs_copy:
695 if "null" not in i["type"]:
696 i["type"] = ["null"] + aslist(i["type"])
698 fill_in_defaults(inputs_copy,
701 # Need to create a builder object to evaluate expressions.
702 builder = make_builder(builder_job_order,
707 # Now update job_order with secondaryFiles
708 discover_secondary_files(arvrunner.fs_access,
713 _jobloaderctx = jobloaderctx.copy()
714 jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
716 jobmapper = upload_dependencies(arvrunner,
720 job_order.get("id", "#"),
723 if "id" in job_order:
726 # Need to filter this out, gets added by cwltool when providing
727 # parameters on the command line.
728 if "job_order" in job_order:
729 del job_order["job_order"]
731 update_from_mapper(job_order, jobmapper)
733 return job_order, jobmapper
735 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
737 def upload_workflow_deps(arvrunner, tool, runtimeContext):
738 # Ensure that Docker images needed by this workflow are available
740 with Perf(metrics, "upload_docker"):
741 upload_docker(arvrunner, tool, runtimeContext)
743 document_loader = tool.doc_loader
750 # Standard traversal is top down, we want to go bottom up, so use
751 # the visitor to accumalate a list of nodes to visit, then
752 # visit them in reverse order.
753 def upload_tool_deps(deptool):
757 tool.visit(upload_tool_deps)
759 for deptool in reversed(todo):
760 discovered_secondaryfiles = {}
761 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
762 pm = upload_dependencies(arvrunner,
763 "%s dependencies" % (shortname(deptool["id"])),
768 include_primary=False,
769 discovered_secondaryfiles=discovered_secondaryfiles,
770 cache=tool_dep_cache)
772 document_loader.idx[deptool["id"]] = deptool
774 for k,v in pm.items():
775 toolmap[k] = v.resolved
777 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
781 def arvados_jobs_image(arvrunner, img, runtimeContext):
782 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
785 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
786 True, runtimeContext)
787 except Exception as e:
788 raise Exception("Docker image %s is not available\n%s" % (img, e) )
791 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
792 collection = arvados.collection.Collection(api_client=arvrunner.api,
793 keep_client=arvrunner.keep_client,
794 num_retries=arvrunner.num_retries)
795 with collection.open("workflow.cwl", "w") as f:
796 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
798 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
799 ["name", "like", name+"%"]]
800 if runtimeContext.project_uuid:
801 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
802 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
805 logger.info("Using collection %s", exists["items"][0]["uuid"])
807 collection.save_new(name=name,
808 owner_uuid=runtimeContext.project_uuid,
809 ensure_unique_name=True,
810 num_retries=arvrunner.num_retries)
811 logger.info("Uploaded to %s", collection.manifest_locator())
813 return collection.portable_data_hash()
816 class Runner(Process):
817 """Base class for runner processes, which submit an instance of
818 arvados-cwl-runner and wait for the final result."""
820 def __init__(self, runner,
821 tool, loadingContext, enable_reuse,
822 output_name, output_tags, submit_runner_ram=0,
823 name=None, on_error=None, submit_runner_image=None,
824 intermediate_output_ttl=0, merged_map=None,
825 priority=None, secret_store=None,
826 collection_cache_size=256,
827 collection_cache_is_default=True,
831 self.loadingContext = loadingContext.copy()
833 super(Runner, self).__init__(tool.tool, loadingContext)
835 self.arvrunner = runner
836 self.embedded_tool = tool
837 self.job_order = None
840 # If reuse is permitted by command line arguments but
841 # disabled by the workflow itself, disable it.
842 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
844 enable_reuse = reuse_req["enableReuse"]
845 reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
847 enable_reuse = reuse_req["enableReuse"]
848 self.enable_reuse = enable_reuse
850 self.final_output = None
851 self.output_name = output_name
852 self.output_tags = output_tags
854 self.on_error = on_error
855 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
856 self.intermediate_output_ttl = intermediate_output_ttl
857 self.priority = priority
858 self.secret_store = secret_store
859 self.enable_dev = self.loadingContext.enable_dev
860 self.git_info = git_info
861 self.fast_parser = self.loadingContext.fast_parser
862 self.reuse_runner = reuse_runner
864 self.submit_runner_cores = 1
865 self.submit_runner_ram = 1024 # defaut 1 GiB
866 self.collection_cache_size = collection_cache_size
868 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
869 if runner_resource_req:
870 if runner_resource_req.get("coresMin"):
871 self.submit_runner_cores = runner_resource_req["coresMin"]
872 if runner_resource_req.get("ramMin"):
873 self.submit_runner_ram = runner_resource_req["ramMin"]
874 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
875 self.collection_cache_size = runner_resource_req["keep_cache"]
877 if submit_runner_ram:
878 # Command line / initializer overrides default and/or spec from workflow
879 self.submit_runner_ram = submit_runner_ram
881 if self.submit_runner_ram <= 0:
882 raise Exception("Value of submit-runner-ram must be greater than zero")
884 if self.submit_runner_cores <= 0:
885 raise Exception("Value of submit-runner-cores must be greater than zero")
887 self.merged_map = merged_map or {}
890 job_order, # type: Mapping[Text, Text]
891 output_callbacks, # type: Callable[[Any, Any], Any]
892 runtimeContext # type: RuntimeContext
893 ): # type: (...) -> Generator[Any, None, None]
894 self.job_order = job_order
895 self._init_job(job_order, runtimeContext)
898 def update_pipeline_component(self, record):
901 def done(self, record):
902 """Base method for handling a completed runner."""
905 if record["state"] == "Complete":
906 if record.get("exit_code") is not None:
907 if record["exit_code"] == 33:
908 processStatus = "UnsupportedRequirement"
909 elif record["exit_code"] == 0:
910 processStatus = "success"
912 processStatus = "permanentFail"
914 processStatus = "success"
916 processStatus = "permanentFail"
920 if processStatus == "permanentFail":
921 logc = arvados.collection.CollectionReader(record["log"],
922 api_client=self.arvrunner.api,
923 keep_client=self.arvrunner.keep_client,
924 num_retries=self.arvrunner.num_retries)
925 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
926 include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
928 self.final_output = record["output"]
929 outc = arvados.collection.CollectionReader(self.final_output,
930 api_client=self.arvrunner.api,
931 keep_client=self.arvrunner.keep_client,
932 num_retries=self.arvrunner.num_retries)
933 if "cwl.output.json" in outc:
934 with outc.open("cwl.output.json", "rb") as f:
936 outputs = json.loads(f.read().decode())
937 def keepify(fileobj):
938 path = fileobj["location"]
939 if not path.startswith("keep:"):
940 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
941 adjustFileObjs(outputs, keepify)
942 adjustDirObjs(outputs, keepify)
944 logger.exception("[%s] While getting final output object", self.name)
945 self.arvrunner.output_callback({}, "permanentFail")
947 self.arvrunner.output_callback(outputs, processStatus)
950 def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool):
951 def collect_locators(obj):
952 loc = obj.get("location", "")
954 g = arvados.util.keepuri_pattern.match(loc)
958 if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj:
959 references.add(obj["acrContainerImage"])
961 if obj.get("class") == "DockerRequirement":
962 references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext))
964 sc_result = scandeps(tool["id"], tool,
966 set(("location", "id")),
967 None, urljoin=doc_loader.fetcher.urljoin,
970 visit_class(sc_result, ("File", "Directory"), collect_locators)
971 visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
974 def print_keep_deps(arvRunner, runtimeContext, merged_map, tool):
977 tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader))
979 for mm in merged_map:
980 for k, v in merged_map[mm].resolved.items():
981 g = arvados.util.keepuri_pattern.match(v)
985 json.dump(sorted(references), arvRunner.stdout)
986 print(file=arvRunner.stdout)