1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
45 if os.name == "posix" and sys.version_info[0] < 3:
46 import subprocess32 as subprocess
50 from schema_salad.sourceline import SourceLine, cmap
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55 shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
65 import arvados.collection
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
73 from ._version import __version__
75 from . context import ArvRuntimeContext
76 from .perf import Perf
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
81 def trim_anonymous_location(obj):
82 """Remove 'location' field from File and Directory literals.
84 To make internal handling easier, literals are assigned a random id for
85 'location'. However, when writing the record back out, this can break
86 reproducibility. Since it is valid for literals not have a 'location'
91 if obj.get("location", "").startswith("_:"):
95 def remove_redundant_fields(obj):
96 for field in ("path", "nameext", "nameroot", "dirname"):
101 def find_defaults(d, op):
102 if isinstance(d, list):
105 elif isinstance(d, dict):
109 for i in viewvalues(d):
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
115 files=[], # type: List[Dict[Text, Text]]
116 bindings=[], # type: List[Dict[Text, Any]]
117 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
118 names=None, # type: Names
119 requirements=requirements, # type: List[Dict[Text, Any]]
120 hints=hints, # type: List[Dict[Text, Any]]
121 resources={}, # type: Dict[str, int]
122 mutation_manager=None, # type: Optional[MutationManager]
123 formatgraph=None, # type: Optional[Graph]
124 make_fs_access=None, # type: Type[StdFsAccess]
125 fs_access=None, # type: StdFsAccess
126 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127 timeout=runtimeContext.eval_timeout, # type: float
128 debug=runtimeContext.debug, # type: bool
129 js_console=runtimeContext.js_console, # type: bool
130 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
131 loadListing="", # type: Text
132 outdir="", # type: Text
133 tmpdir="", # type: Text
134 stagedir="", # type: Text
135 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136 container_engine="docker"
139 def search_schemadef(name, reqs):
141 if r["class"] == "SchemaDefRequirement":
142 for sd in r["types"]:
143 if sd["name"] == name:
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148 "float", "double", "string", "record",
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153 # union type, collect all possible secondaryFiles
154 for i in inputschema:
155 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
158 if inputschema == "File":
159 inputschema = {"type": "File"}
161 if isinstance(inputschema, basestring):
162 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
168 if "secondaryFiles" in inputschema:
169 # set secondaryFiles, may be inherited by compound types.
170 secondaryspec = inputschema["secondaryFiles"]
172 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173 not isinstance(inputschema["type"], basestring)):
174 # compound type (union, array, record)
175 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
177 elif (inputschema["type"] == "record" and
178 isinstance(primary, Mapping)):
180 # record type, find secondary files associated with fields.
182 for f in inputschema["fields"]:
183 p = primary.get(shortname(f["name"]))
185 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
187 elif (inputschema["type"] == "array" and
188 isinstance(primary, Sequence)):
190 # array type, find secondary files of elements
193 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
195 elif (inputschema["type"] == "File" and
196 isinstance(primary, Mapping) and
197 primary.get("class") == "File"):
199 if "secondaryFiles" in primary or not secondaryspec:
204 # Found a file, check for secondaryFiles
207 primary["secondaryFiles"] = secondaryspec
208 for i, sf in enumerate(aslist(secondaryspec)):
209 if builder.cwlVersion == "v1.0":
212 pattern = sf["pattern"]
215 if isinstance(pattern, list):
216 specs.extend(pattern)
217 elif isinstance(pattern, dict):
218 specs.append(pattern)
219 elif isinstance(pattern, str):
220 if builder.cwlVersion == "v1.0":
221 specs.append({"pattern": pattern, "required": True})
223 specs.append({"pattern": pattern, "required": sf.get("required")})
225 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226 "Expression must return list, object, string or null")
229 for i, sf in enumerate(specs):
230 if isinstance(sf, dict):
231 if sf.get("class") == "File":
233 if sf.get("location") is None:
234 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235 "File object is missing 'location': %s" % sf)
236 sfpath = sf["location"]
239 pattern = sf["pattern"]
240 required = sf.get("required")
241 elif isinstance(sf, str):
245 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246 "Expression must return list, object, string or null")
248 if pattern is not None:
249 if "${" in pattern or "$(" in pattern:
250 sfname = builder.do_eval(pattern, context=primary)
252 sfname = substitute(primary["basename"], pattern)
257 if isinstance(sfname, str):
258 p_location = primary["location"]
259 if "/" in p_location:
261 p_location[0 : p_location.rindex("/") + 1]
265 required = builder.do_eval(required, context=primary)
267 if isinstance(sfname, list) or isinstance(sfname, dict):
268 each = aslist(sfname)
270 if required and not fsaccess.exists(e.get("location")):
271 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272 "Required secondary file '%s' does not exist" % e.get("location"))
275 if isinstance(sfname, str):
276 if fsaccess.exists(sfpath):
277 if pattern is not None:
278 found.append({"location": sfpath, "class": "File"})
282 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283 "Required secondary file '%s' does not exist" % sfpath)
285 primary["secondaryFiles"] = cmap(found)
286 if discovered is not None:
287 discovered[primary["location"]] = primary["secondaryFiles"]
288 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292 for inputschema in inputs:
293 primary = job_order.get(shortname(inputschema["id"]))
294 if isinstance(primary, (Mapping, Sequence)):
295 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
297 def upload_dependencies(arvrunner, name, document_loader,
298 workflowobj, uri, runtimeContext,
299 include_primary=True, discovered_secondaryfiles=None,
301 """Upload the dependencies of the workflowobj document to Keep.
303 Returns a pathmapper object mapping local paths to keep references. Also
304 does an in-place update of references in "workflowobj".
306 Use scandeps to find $schemas, File and Directory
307 fields that represent external references.
309 If workflowobj has an "id" field, this will reload the document to ensure
310 it is scanning the raw document prior to preprocessing.
313 scanobj = workflowobj
316 with Perf(metrics, "scandeps"):
317 sc_result = scandeps(uri, scanobj,
320 None, urljoin=document_loader.fetcher.urljoin,
322 optional_deps = scandeps(uri, scanobj,
325 None, urljoin=document_loader.fetcher.urljoin,
328 if sc_result is None:
331 if optional_deps is None:
335 sc_result.extend(optional_deps)
340 def collect_uuids(obj):
341 loc = obj.get("location", "")
344 # Collect collection uuids that need to be resolved to
345 # portable data hashes
346 gp = collection_uuid_pattern.match(loc)
348 uuids[gp.groups()[0]] = obj
349 if collectionUUID in obj:
350 uuids[obj[collectionUUID]] = obj
352 def collect_uploads(obj):
353 loc = obj.get("location", "")
357 if sp[0] in ("file", "http", "https"):
358 # Record local files than need to be uploaded,
359 # don't include file literals, keep references, etc.
363 with Perf(metrics, "collect uuids"):
364 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
366 with Perf(metrics, "collect uploads"):
367 visit_class(sc_result, ("File", "Directory"), collect_uploads)
369 # Resolve any collection uuids we found to portable data hashes
370 # and assign them to uuid_map
372 fetch_uuids = list(uuids.keys())
373 with Perf(metrics, "fetch_uuids"):
375 # For a large number of fetch_uuids, API server may limit
376 # response size, so keep fetching from API server has nothing
378 lookups = arvrunner.api.collections().list(
379 filters=[["uuid", "in", fetch_uuids]],
381 select=["uuid", "portable_data_hash"]).execute(
382 num_retries=arvrunner.num_retries)
384 if not lookups["items"]:
387 for l in lookups["items"]:
388 uuid_map[l["uuid"]] = l["portable_data_hash"]
390 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
392 normalizeFilesDirs(sc)
394 if "id" in workflowobj:
395 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
397 # make sure it's included
398 sc.append({"class": "File", "location": defrg})
400 # make sure it's excluded
401 sc = [d for d in sc if d.get("location") != defrg]
403 def visit_default(obj):
404 def defaults_are_optional(f):
405 if "location" not in f and "path" in f:
406 f["location"] = f["path"]
408 normalizeFilesDirs(f)
409 optional_deps.append(f)
410 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
412 find_defaults(workflowobj, visit_default)
415 def discover_default_secondary_files(obj):
416 builder_job_order = {}
417 for t in obj["inputs"]:
418 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
419 # Need to create a builder object to evaluate expressions.
420 builder = make_builder(builder_job_order,
421 obj.get("hints", []),
422 obj.get("requirements", []),
425 discover_secondary_files(arvrunner.fs_access,
431 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
432 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
434 for d in list(discovered):
435 # Only interested in discovered secondaryFiles which are local
436 # files that need to be uploaded.
437 if d.startswith("file:"):
438 sc.extend(discovered[d])
442 with Perf(metrics, "mapper"):
443 mapper = ArvPathMapper(arvrunner, sc, "",
447 single_collection=True,
448 optional_deps=optional_deps)
450 for k, v in uuid_map.items():
451 mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
455 if k.startswith("keep:"):
456 keeprefs.add(collection_pdh_pattern.match(k).group(1))
460 loc = p.get("location")
461 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
462 addkeepref(p["location"])
468 if collectionUUID in p:
469 uuid = p[collectionUUID]
470 if uuid not in uuid_map:
471 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
472 "Collection uuid %s not found" % uuid)
473 gp = collection_pdh_pattern.match(loc)
474 if gp and uuid_map[uuid] != gp.groups()[0]:
475 # This file entry has both collectionUUID and a PDH
476 # location. If the PDH doesn't match the one returned
477 # the API server, raise an error.
478 raise SourceLine(p, "location", validate.ValidationException).makeError(
479 "Expected collection uuid %s to be %s but API server reported %s" % (
480 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
482 gp = collection_uuid_pattern.match(loc)
484 # Not a uuid pattern (must be a pdh pattern)
485 addkeepref(p["location"])
488 uuid = gp.groups()[0]
489 if uuid not in uuid_map:
490 raise SourceLine(p, "location", validate.ValidationException).makeError(
491 "Collection uuid %s not found" % uuid)
493 with Perf(metrics, "collectloc"):
494 visit_class(workflowobj, ("File", "Directory"), collectloc)
495 visit_class(discovered, ("File", "Directory"), collectloc)
497 if discovered_secondaryfiles is not None:
499 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
501 if runtimeContext.copy_deps:
502 # Find referenced collections and copy them into the
503 # destination project, for easy sharing.
504 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
505 filters=[["portable_data_hash", "in", list(keeprefs)],
506 ["owner_uuid", "=", runtimeContext.project_uuid]],
507 select=["uuid", "portable_data_hash", "created_at"]))
509 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
511 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
512 order="created_at desc",
513 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
515 if len(col["items"]) == 0:
516 logger.warning("Cannot find collection with portable data hash %s", kr)
518 col = col["items"][0]
519 col["name"] = arvados.util.trim_name(col["name"])
521 arvrunner.api.collections().create(body={"collection": {
522 "owner_uuid": runtimeContext.project_uuid,
524 "description": col["description"],
525 "properties": col["properties"],
526 "portable_data_hash": col["portable_data_hash"],
527 "manifest_text": col["manifest_text"],
528 "storage_classes_desired": col["storage_classes_desired"],
529 "trash_at": col["trash_at"]
530 }}, ensure_unique_name=True).execute()
531 except Exception as e:
532 logger.warning("Unable to copy collection to destination: %s", e)
534 if "$schemas" in workflowobj:
536 for s in workflowobj["$schemas"]:
538 sch.append(mapper.mapper(s).resolved)
539 workflowobj["$schemas"] = sch
544 def upload_docker(arvrunner, tool, runtimeContext):
545 """Uploads Docker images used in CommandLineTool objects."""
547 if isinstance(tool, CommandLineTool):
548 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
550 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
551 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
552 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
554 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
556 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
557 True, runtimeContext)
558 elif isinstance(tool, cwltool.workflow.Workflow):
560 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
563 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
564 """Create a packed workflow.
566 A "packed" workflow is one where all the components have been combined into a single document."""
569 packed = pack(arvrunner.loadingContext, tool.tool["id"],
570 rewrite_out=rewrites,
571 loader=tool.doc_loader)
573 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
575 def visit(v, cur_id):
576 if isinstance(v, dict):
577 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
578 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
579 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
581 cur_id = rewrite_to_orig.get(v["id"], v["id"])
582 if "path" in v and "location" not in v:
583 v["location"] = v["path"]
585 if "location" in v and cur_id in merged_map:
586 if v["location"] in merged_map[cur_id].resolved:
587 v["location"] = merged_map[cur_id].resolved[v["location"]]
588 if v["location"] in merged_map[cur_id].secondaryFiles:
589 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
590 if v.get("class") == "DockerRequirement":
591 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
595 if isinstance(v, list):
602 packed[g] = git_info[g]
607 def tag_git_version(packed):
608 if tool.tool["id"].startswith("file://"):
609 path = os.path.dirname(tool.tool["id"][7:])
611 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
612 except (OSError, subprocess.CalledProcessError):
615 packed["http://schema.org/version"] = githash
617 def setloc(mapper, p):
618 loc = p.get("location")
619 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
620 p["location"] = mapper.mapper(p["location"]).resolved
626 if collectionUUID in p:
627 uuid = p[collectionUUID]
628 keepuuid = "keep:"+uuid
629 if keepuuid not in mapper:
630 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
631 "Collection uuid %s not found" % uuid)
632 gp = collection_pdh_pattern.match(loc)
633 if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
634 # This file entry has both collectionUUID and a PDH
635 # location. If the PDH doesn't match the one returned
636 # the API server, raise an error.
637 raise SourceLine(p, "location", validate.ValidationException).makeError(
638 "Expected collection uuid %s to be %s but API server reported %s" % (
639 uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
641 gp = collection_uuid_pattern.match(loc)
643 # Not a uuid pattern (must be a pdh pattern)
646 uuid = gp.groups()[0]
647 keepuuid = "keep:"+uuid
648 if keepuuid not in mapper:
649 raise SourceLine(p, "location", validate.ValidationException).makeError(
650 "Collection uuid %s not found" % uuid)
651 p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
652 p[collectionUUID] = uuid
654 def update_from_mapper(workflowobj, mapper):
655 with Perf(metrics, "setloc"):
656 visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
658 def apply_merged_map(merged_map, workflowobj):
659 def visit(v, cur_id):
660 if isinstance(v, dict):
661 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
664 if "path" in v and "location" not in v:
665 v["location"] = v["path"]
667 if "location" in v and cur_id in merged_map:
668 if v["location"] in merged_map[cur_id].resolved:
669 v["location"] = merged_map[cur_id].resolved[v["location"]]
670 if v["location"] in merged_map[cur_id].secondaryFiles:
671 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
672 #if v.get("class") == "DockerRequirement":
673 # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
677 if isinstance(v, list):
680 visit(workflowobj, None)
682 def update_from_merged_map(tool, merged_map):
683 tool.visit(partial(apply_merged_map, merged_map))
685 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
686 """Upload local files referenced in the input object and return updated input
687 object with 'location' updated to the proper keep references.
690 # Make a copy of the job order and set defaults.
691 builder_job_order = copy.copy(job_order)
693 # fill_in_defaults throws an error if there are any
694 # missing required parameters, we don't want it to do that
695 # so make them all optional.
696 inputs_copy = copy.deepcopy(tool.tool["inputs"])
697 for i in inputs_copy:
698 if "null" not in i["type"]:
699 i["type"] = ["null"] + aslist(i["type"])
701 fill_in_defaults(inputs_copy,
704 # Need to create a builder object to evaluate expressions.
705 builder = make_builder(builder_job_order,
710 # Now update job_order with secondaryFiles
711 discover_secondary_files(arvrunner.fs_access,
716 _jobloaderctx = jobloaderctx.copy()
717 jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
719 jobmapper = upload_dependencies(arvrunner,
723 job_order.get("id", "#"),
726 if "id" in job_order:
729 # Need to filter this out, gets added by cwltool when providing
730 # parameters on the command line.
731 if "job_order" in job_order:
732 del job_order["job_order"]
734 update_from_mapper(job_order, jobmapper)
736 return job_order, jobmapper
738 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
740 def upload_workflow_deps(arvrunner, tool, runtimeContext):
741 # Ensure that Docker images needed by this workflow are available
743 with Perf(metrics, "upload_docker"):
744 upload_docker(arvrunner, tool, runtimeContext)
746 document_loader = tool.doc_loader
753 # Standard traversal is top down, we want to go bottom up, so use
754 # the visitor to accumalate a list of nodes to visit, then
755 # visit them in reverse order.
756 def upload_tool_deps(deptool):
760 tool.visit(upload_tool_deps)
762 for deptool in reversed(todo):
763 discovered_secondaryfiles = {}
764 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
765 pm = upload_dependencies(arvrunner,
766 "%s dependencies" % (shortname(deptool["id"])),
771 include_primary=False,
772 discovered_secondaryfiles=discovered_secondaryfiles,
773 cache=tool_dep_cache)
775 document_loader.idx[deptool["id"]] = deptool
777 for k,v in pm.items():
778 toolmap[k] = v.resolved
780 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
784 def arvados_jobs_image(arvrunner, img, runtimeContext):
785 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
788 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
789 True, runtimeContext)
790 except Exception as e:
791 raise Exception("Docker image %s is not available\n%s" % (img, e) )
794 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
795 collection = arvados.collection.Collection(api_client=arvrunner.api,
796 keep_client=arvrunner.keep_client,
797 num_retries=arvrunner.num_retries)
798 with collection.open("workflow.cwl", "w") as f:
799 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
801 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
802 ["name", "like", name+"%"]]
803 if runtimeContext.project_uuid:
804 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
805 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
808 logger.info("Using collection %s", exists["items"][0]["uuid"])
810 collection.save_new(name=name,
811 owner_uuid=runtimeContext.project_uuid,
812 ensure_unique_name=True,
813 num_retries=arvrunner.num_retries)
814 logger.info("Uploaded to %s", collection.manifest_locator())
816 return collection.portable_data_hash()
819 class Runner(Process):
820 """Base class for runner processes, which submit an instance of
821 arvados-cwl-runner and wait for the final result."""
823 def __init__(self, runner,
824 tool, loadingContext, enable_reuse,
825 output_name, output_tags, submit_runner_ram=0,
826 name=None, on_error=None, submit_runner_image=None,
827 intermediate_output_ttl=0, merged_map=None,
828 priority=None, secret_store=None,
829 collection_cache_size=256,
830 collection_cache_is_default=True,
833 self.loadingContext = loadingContext.copy()
835 super(Runner, self).__init__(tool.tool, loadingContext)
837 self.arvrunner = runner
838 self.embedded_tool = tool
839 self.job_order = None
842 # If reuse is permitted by command line arguments but
843 # disabled by the workflow itself, disable it.
844 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
846 enable_reuse = reuse_req["enableReuse"]
847 reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
849 enable_reuse = reuse_req["enableReuse"]
850 self.enable_reuse = enable_reuse
852 self.final_output = None
853 self.output_name = output_name
854 self.output_tags = output_tags
856 self.on_error = on_error
857 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
858 self.intermediate_output_ttl = intermediate_output_ttl
859 self.priority = priority
860 self.secret_store = secret_store
861 self.enable_dev = self.loadingContext.enable_dev
862 self.git_info = git_info
863 self.fast_parser = self.loadingContext.fast_parser
865 self.submit_runner_cores = 1
866 self.submit_runner_ram = 1024 # defaut 1 GiB
867 self.collection_cache_size = collection_cache_size
869 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
870 if runner_resource_req:
871 if runner_resource_req.get("coresMin"):
872 self.submit_runner_cores = runner_resource_req["coresMin"]
873 if runner_resource_req.get("ramMin"):
874 self.submit_runner_ram = runner_resource_req["ramMin"]
875 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
876 self.collection_cache_size = runner_resource_req["keep_cache"]
878 if submit_runner_ram:
879 # Command line / initializer overrides default and/or spec from workflow
880 self.submit_runner_ram = submit_runner_ram
882 if self.submit_runner_ram <= 0:
883 raise Exception("Value of submit-runner-ram must be greater than zero")
885 if self.submit_runner_cores <= 0:
886 raise Exception("Value of submit-runner-cores must be greater than zero")
888 self.merged_map = merged_map or {}
891 job_order, # type: Mapping[Text, Text]
892 output_callbacks, # type: Callable[[Any, Any], Any]
893 runtimeContext # type: RuntimeContext
894 ): # type: (...) -> Generator[Any, None, None]
895 self.job_order = job_order
896 self._init_job(job_order, runtimeContext)
899 def update_pipeline_component(self, record):
902 def done(self, record):
903 """Base method for handling a completed runner."""
906 if record["state"] == "Complete":
907 if record.get("exit_code") is not None:
908 if record["exit_code"] == 33:
909 processStatus = "UnsupportedRequirement"
910 elif record["exit_code"] == 0:
911 processStatus = "success"
913 processStatus = "permanentFail"
915 processStatus = "success"
917 processStatus = "permanentFail"
921 if processStatus == "permanentFail":
922 logc = arvados.collection.CollectionReader(record["log"],
923 api_client=self.arvrunner.api,
924 keep_client=self.arvrunner.keep_client,
925 num_retries=self.arvrunner.num_retries)
926 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
927 include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
929 self.final_output = record["output"]
930 outc = arvados.collection.CollectionReader(self.final_output,
931 api_client=self.arvrunner.api,
932 keep_client=self.arvrunner.keep_client,
933 num_retries=self.arvrunner.num_retries)
934 if "cwl.output.json" in outc:
935 with outc.open("cwl.output.json", "rb") as f:
937 outputs = json.loads(f.read().decode())
938 def keepify(fileobj):
939 path = fileobj["location"]
940 if not path.startswith("keep:"):
941 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
942 adjustFileObjs(outputs, keepify)
943 adjustDirObjs(outputs, keepify)
945 logger.exception("[%s] While getting final output object", self.name)
946 self.arvrunner.output_callback({}, "permanentFail")
948 self.arvrunner.output_callback(outputs, processStatus)