1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
45 if os.name == "posix" and sys.version_info[0] < 3:
46 import subprocess32 as subprocess
50 from schema_salad.sourceline import SourceLine, cmap
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55 shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
65 import arvados.collection
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
73 from ._version import __version__
75 from . context import ArvRuntimeContext
76 from .perf import Perf
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
81 def trim_anonymous_location(obj):
82 """Remove 'location' field from File and Directory literals.
84 To make internal handling easier, literals are assigned a random id for
85 'location'. However, when writing the record back out, this can break
86 reproducibility. Since it is valid for literals not have a 'location'
91 if obj.get("location", "").startswith("_:"):
95 def remove_redundant_fields(obj):
96 for field in ("path", "nameext", "nameroot", "dirname"):
101 def find_defaults(d, op):
102 if isinstance(d, list):
105 elif isinstance(d, dict):
109 for i in viewvalues(d):
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
115 files=[], # type: List[Dict[Text, Text]]
116 bindings=[], # type: List[Dict[Text, Any]]
117 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
118 names=None, # type: Names
119 requirements=requirements, # type: List[Dict[Text, Any]]
120 hints=hints, # type: List[Dict[Text, Any]]
121 resources={}, # type: Dict[str, int]
122 mutation_manager=None, # type: Optional[MutationManager]
123 formatgraph=None, # type: Optional[Graph]
124 make_fs_access=None, # type: Type[StdFsAccess]
125 fs_access=None, # type: StdFsAccess
126 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127 timeout=runtimeContext.eval_timeout, # type: float
128 debug=runtimeContext.debug, # type: bool
129 js_console=runtimeContext.js_console, # type: bool
130 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
131 loadListing="", # type: Text
132 outdir="", # type: Text
133 tmpdir="", # type: Text
134 stagedir="", # type: Text
135 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136 container_engine="docker"
139 def search_schemadef(name, reqs):
141 if r["class"] == "SchemaDefRequirement":
142 for sd in r["types"]:
143 if sd["name"] == name:
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148 "float", "double", "string", "record",
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153 # union type, collect all possible secondaryFiles
154 for i in inputschema:
155 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
158 if inputschema == "File":
159 inputschema = {"type": "File"}
161 if isinstance(inputschema, basestring):
162 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
168 if "secondaryFiles" in inputschema:
169 # set secondaryFiles, may be inherited by compound types.
170 secondaryspec = inputschema["secondaryFiles"]
172 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173 not isinstance(inputschema["type"], basestring)):
174 # compound type (union, array, record)
175 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
177 elif (inputschema["type"] == "record" and
178 isinstance(primary, Mapping)):
180 # record type, find secondary files associated with fields.
182 for f in inputschema["fields"]:
183 p = primary.get(shortname(f["name"]))
185 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
187 elif (inputschema["type"] == "array" and
188 isinstance(primary, Sequence)):
190 # array type, find secondary files of elements
193 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
195 elif (inputschema["type"] == "File" and
196 isinstance(primary, Mapping) and
197 primary.get("class") == "File"):
199 if "secondaryFiles" in primary or not secondaryspec:
204 # Found a file, check for secondaryFiles
207 primary["secondaryFiles"] = secondaryspec
208 for i, sf in enumerate(aslist(secondaryspec)):
209 if builder.cwlVersion == "v1.0":
212 pattern = sf["pattern"]
215 if isinstance(pattern, list):
216 specs.extend(pattern)
217 elif isinstance(pattern, dict):
218 specs.append(pattern)
219 elif isinstance(pattern, str):
220 if builder.cwlVersion == "v1.0":
221 specs.append({"pattern": pattern, "required": True})
223 specs.append({"pattern": pattern, "required": sf.get("required")})
225 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226 "Expression must return list, object, string or null")
229 for i, sf in enumerate(specs):
230 if isinstance(sf, dict):
231 if sf.get("class") == "File":
233 if sf.get("location") is None:
234 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235 "File object is missing 'location': %s" % sf)
236 sfpath = sf["location"]
239 pattern = sf["pattern"]
240 required = sf.get("required")
241 elif isinstance(sf, str):
245 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246 "Expression must return list, object, string or null")
248 if pattern is not None:
249 if "${" in pattern or "$(" in pattern:
250 sfname = builder.do_eval(pattern, context=primary)
252 sfname = substitute(primary["basename"], pattern)
257 if isinstance(sfname, str):
258 p_location = primary["location"]
259 if "/" in p_location:
261 p_location[0 : p_location.rindex("/") + 1]
265 required = builder.do_eval(required, context=primary)
267 if isinstance(sfname, list) or isinstance(sfname, dict):
268 each = aslist(sfname)
270 if required and not fsaccess.exists(e.get("location")):
271 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272 "Required secondary file '%s' does not exist" % e.get("location"))
275 if isinstance(sfname, str):
276 if fsaccess.exists(sfpath):
277 if pattern is not None:
278 found.append({"location": sfpath, "class": "File"})
282 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283 "Required secondary file '%s' does not exist" % sfpath)
285 primary["secondaryFiles"] = cmap(found)
286 if discovered is not None:
287 discovered[primary["location"]] = primary["secondaryFiles"]
288 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292 for inputschema in inputs:
293 primary = job_order.get(shortname(inputschema["id"]))
294 if isinstance(primary, (Mapping, Sequence)):
295 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
297 def upload_dependencies(arvrunner, name, document_loader,
298 workflowobj, uri, runtimeContext,
299 include_primary=True, discovered_secondaryfiles=None,
301 """Upload the dependencies of the workflowobj document to Keep.
303 Returns a pathmapper object mapping local paths to keep references. Also
304 does an in-place update of references in "workflowobj".
306 Use scandeps to find $schemas, File and Directory
307 fields that represent external references.
309 If workflowobj has an "id" field, this will reload the document to ensure
310 it is scanning the raw document prior to preprocessing.
313 scanobj = workflowobj
316 with Perf(metrics, "scandeps"):
317 sc_result = scandeps(uri, scanobj,
320 None, urljoin=document_loader.fetcher.urljoin,
322 optional_deps = scandeps(uri, scanobj,
325 None, urljoin=document_loader.fetcher.urljoin,
328 if sc_result is None:
331 if optional_deps is None:
335 sc_result.extend(optional_deps)
337 #print("BOING", uri, sc_result)
342 def collect_uuids(obj):
343 loc = obj.get("location", "")
346 # Collect collection uuids that need to be resolved to
347 # portable data hashes
348 gp = collection_uuid_pattern.match(loc)
350 uuids[gp.groups()[0]] = obj
351 if collectionUUID in obj:
352 uuids[obj[collectionUUID]] = obj
354 def collect_uploads(obj):
355 loc = obj.get("location", "")
359 if sp[0] in ("file", "http", "https"):
360 # Record local files than need to be uploaded,
361 # don't include file literals, keep references, etc.
365 with Perf(metrics, "collect uuids"):
366 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
368 with Perf(metrics, "collect uploads"):
369 visit_class(sc_result, ("File", "Directory"), collect_uploads)
371 # Resolve any collection uuids we found to portable data hashes
372 # and assign them to uuid_map
374 fetch_uuids = list(uuids.keys())
375 with Perf(metrics, "fetch_uuids"):
377 # For a large number of fetch_uuids, API server may limit
378 # response size, so keep fetching from API server has nothing
380 lookups = arvrunner.api.collections().list(
381 filters=[["uuid", "in", fetch_uuids]],
383 select=["uuid", "portable_data_hash"]).execute(
384 num_retries=arvrunner.num_retries)
386 if not lookups["items"]:
389 for l in lookups["items"]:
390 uuid_map[l["uuid"]] = l["portable_data_hash"]
392 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
394 normalizeFilesDirs(sc)
396 if "id" in workflowobj:
397 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
399 # make sure it's included
400 sc.append({"class": "File", "location": defrg})
402 # make sure it's excluded
403 sc = [d for d in sc if d.get("location") != defrg]
405 def visit_default(obj):
406 def defaults_are_optional(f):
407 if "location" not in f and "path" in f:
408 f["location"] = f["path"]
410 normalizeFilesDirs(f)
411 optional_deps.append(f)
412 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
414 find_defaults(workflowobj, visit_default)
417 def discover_default_secondary_files(obj):
418 builder_job_order = {}
419 for t in obj["inputs"]:
420 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
421 # Need to create a builder object to evaluate expressions.
422 builder = make_builder(builder_job_order,
423 obj.get("hints", []),
424 obj.get("requirements", []),
427 discover_secondary_files(arvrunner.fs_access,
433 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
434 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
436 for d in list(discovered):
437 # Only interested in discovered secondaryFiles which are local
438 # files that need to be uploaded.
439 if d.startswith("file:"):
440 sc.extend(discovered[d])
444 with Perf(metrics, "mapper"):
445 mapper = ArvPathMapper(arvrunner, sc, "",
449 single_collection=True,
450 optional_deps=optional_deps)
452 for k, v in uuid_map.items():
453 mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
457 if k.startswith("keep:"):
458 keeprefs.add(collection_pdh_pattern.match(k).group(1))
462 loc = p.get("location")
463 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
464 addkeepref(p["location"])
470 if collectionUUID in p:
471 uuid = p[collectionUUID]
472 if uuid not in uuid_map:
473 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
474 "Collection uuid %s not found" % uuid)
475 gp = collection_pdh_pattern.match(loc)
476 if gp and uuid_map[uuid] != gp.groups()[0]:
477 # This file entry has both collectionUUID and a PDH
478 # location. If the PDH doesn't match the one returned
479 # the API server, raise an error.
480 raise SourceLine(p, "location", validate.ValidationException).makeError(
481 "Expected collection uuid %s to be %s but API server reported %s" % (
482 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
484 gp = collection_uuid_pattern.match(loc)
486 # Not a uuid pattern (must be a pdh pattern)
487 addkeepref(p["location"])
490 uuid = gp.groups()[0]
491 if uuid not in uuid_map:
492 raise SourceLine(p, "location", validate.ValidationException).makeError(
493 "Collection uuid %s not found" % uuid)
495 with Perf(metrics, "collectloc"):
496 visit_class(workflowobj, ("File", "Directory"), collectloc)
497 visit_class(discovered, ("File", "Directory"), collectloc)
499 if discovered_secondaryfiles is not None:
501 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
503 if runtimeContext.copy_deps:
504 # Find referenced collections and copy them into the
505 # destination project, for easy sharing.
506 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
507 filters=[["portable_data_hash", "in", list(keeprefs)],
508 ["owner_uuid", "=", runtimeContext.project_uuid]],
509 select=["uuid", "portable_data_hash", "created_at"]))
511 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
513 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
514 order="created_at desc",
515 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
517 if len(col["items"]) == 0:
518 logger.warning("Cannot find collection with portable data hash %s", kr)
520 col = col["items"][0]
521 col["name"] = arvados.util.trim_name(col["name"])
523 arvrunner.api.collections().create(body={"collection": {
524 "owner_uuid": runtimeContext.project_uuid,
526 "description": col["description"],
527 "properties": col["properties"],
528 "portable_data_hash": col["portable_data_hash"],
529 "manifest_text": col["manifest_text"],
530 "storage_classes_desired": col["storage_classes_desired"],
531 "trash_at": col["trash_at"]
532 }}, ensure_unique_name=True).execute()
533 except Exception as e:
534 logger.warning("Unable to copy collection to destination: %s", e)
536 if "$schemas" in workflowobj:
538 for s in workflowobj["$schemas"]:
540 sch.append(mapper.mapper(s).resolved)
541 workflowobj["$schemas"] = sch
546 def upload_docker(arvrunner, tool, runtimeContext):
547 """Uploads Docker images used in CommandLineTool objects."""
549 if isinstance(tool, CommandLineTool):
550 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
552 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
553 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
554 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
556 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
558 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
559 True, runtimeContext)
560 elif isinstance(tool, cwltool.workflow.Workflow):
562 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
565 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
566 """Create a packed workflow.
568 A "packed" workflow is one where all the components have been combined into a single document."""
571 packed = pack(arvrunner.loadingContext, tool.tool["id"],
572 rewrite_out=rewrites,
573 loader=tool.doc_loader)
575 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
577 def visit(v, cur_id):
578 if isinstance(v, dict):
579 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
580 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
581 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
583 cur_id = rewrite_to_orig.get(v["id"], v["id"])
584 if "path" in v and "location" not in v:
585 v["location"] = v["path"]
587 if "location" in v and cur_id in merged_map:
588 if v["location"] in merged_map[cur_id].resolved:
589 v["location"] = merged_map[cur_id].resolved[v["location"]]
590 if v["location"] in merged_map[cur_id].secondaryFiles:
591 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
592 if v.get("class") == "DockerRequirement":
593 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
597 if isinstance(v, list):
604 packed[g] = git_info[g]
609 def tag_git_version(packed):
610 if tool.tool["id"].startswith("file://"):
611 path = os.path.dirname(tool.tool["id"][7:])
613 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
614 except (OSError, subprocess.CalledProcessError):
617 packed["http://schema.org/version"] = githash
619 def setloc(mapper, p):
620 loc = p.get("location")
621 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
622 p["location"] = mapper.mapper(p["location"]).resolved
628 if collectionUUID in p:
629 uuid = p[collectionUUID]
630 keepuuid = "keep:"+uuid
631 if keepuuid not in mapper:
632 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
633 "Collection uuid %s not found" % uuid)
634 gp = collection_pdh_pattern.match(loc)
635 if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
636 # This file entry has both collectionUUID and a PDH
637 # location. If the PDH doesn't match the one returned
638 # the API server, raise an error.
639 raise SourceLine(p, "location", validate.ValidationException).makeError(
640 "Expected collection uuid %s to be %s but API server reported %s" % (
641 uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
643 gp = collection_uuid_pattern.match(loc)
645 # Not a uuid pattern (must be a pdh pattern)
648 uuid = gp.groups()[0]
649 keepuuid = "keep:"+uuid
650 if keepuuid not in mapper:
651 raise SourceLine(p, "location", validate.ValidationException).makeError(
652 "Collection uuid %s not found" % uuid)
653 p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
654 p[collectionUUID] = uuid
656 def update_from_mapper(workflowobj, mapper):
657 with Perf(metrics, "setloc"):
658 visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
660 def apply_merged_map(merged_map, workflowobj):
661 def visit(v, cur_id):
662 if isinstance(v, dict):
663 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
666 if "path" in v and "location" not in v:
667 v["location"] = v["path"]
669 if "location" in v and cur_id in merged_map:
670 if v["location"] in merged_map[cur_id].resolved:
671 v["location"] = merged_map[cur_id].resolved[v["location"]]
672 if v["location"] in merged_map[cur_id].secondaryFiles:
673 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
674 #if v.get("class") == "DockerRequirement":
675 # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
679 if isinstance(v, list):
682 visit(workflowobj, None)
684 def update_from_merged_map(tool, merged_map):
685 tool.visit(partial(apply_merged_map, merged_map))
687 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
688 """Upload local files referenced in the input object and return updated input
689 object with 'location' updated to the proper keep references.
692 # Make a copy of the job order and set defaults.
693 builder_job_order = copy.copy(job_order)
695 # fill_in_defaults throws an error if there are any
696 # missing required parameters, we don't want it to do that
697 # so make them all optional.
698 inputs_copy = copy.deepcopy(tool.tool["inputs"])
699 for i in inputs_copy:
700 if "null" not in i["type"]:
701 i["type"] = ["null"] + aslist(i["type"])
703 fill_in_defaults(inputs_copy,
706 # Need to create a builder object to evaluate expressions.
707 builder = make_builder(builder_job_order,
712 # Now update job_order with secondaryFiles
713 discover_secondary_files(arvrunner.fs_access,
718 _jobloaderctx = jobloaderctx.copy()
719 jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
721 jobmapper = upload_dependencies(arvrunner,
725 job_order.get("id", "#"),
728 if "id" in job_order:
731 # Need to filter this out, gets added by cwltool when providing
732 # parameters on the command line.
733 if "job_order" in job_order:
734 del job_order["job_order"]
736 update_from_mapper(job_order, jobmapper)
738 #print(json.dumps(job_order, indent=2))
740 return job_order, jobmapper
742 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
744 def upload_workflow_deps(arvrunner, tool, runtimeContext):
745 # Ensure that Docker images needed by this workflow are available
747 with Perf(metrics, "upload_docker"):
748 upload_docker(arvrunner, tool, runtimeContext)
750 document_loader = tool.doc_loader
757 # Standard traversal is top down, we want to go bottom up, so use
758 # the visitor to accumalate a list of nodes to visit, then
759 # visit them in reverse order.
760 def upload_tool_deps(deptool):
764 tool.visit(upload_tool_deps)
766 for deptool in reversed(todo):
767 discovered_secondaryfiles = {}
768 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
769 pm = upload_dependencies(arvrunner,
770 "%s dependencies" % (shortname(deptool["id"])),
775 include_primary=False,
776 discovered_secondaryfiles=discovered_secondaryfiles,
777 cache=tool_dep_cache)
779 document_loader.idx[deptool["id"]] = deptool
781 for k,v in pm.items():
782 toolmap[k] = v.resolved
784 #print("visited", deptool["id"], toolmap, discovered_secondaryfiles)
786 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
790 def arvados_jobs_image(arvrunner, img, runtimeContext):
791 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
794 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
795 True, runtimeContext)
796 except Exception as e:
797 raise Exception("Docker image %s is not available\n%s" % (img, e) )
800 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
801 collection = arvados.collection.Collection(api_client=arvrunner.api,
802 keep_client=arvrunner.keep_client,
803 num_retries=arvrunner.num_retries)
804 with collection.open("workflow.cwl", "w") as f:
805 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
807 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
808 ["name", "like", name+"%"]]
809 if runtimeContext.project_uuid:
810 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
811 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
814 logger.info("Using collection %s", exists["items"][0]["uuid"])
816 collection.save_new(name=name,
817 owner_uuid=runtimeContext.project_uuid,
818 ensure_unique_name=True,
819 num_retries=arvrunner.num_retries)
820 logger.info("Uploaded to %s", collection.manifest_locator())
822 return collection.portable_data_hash()
825 class Runner(Process):
826 """Base class for runner processes, which submit an instance of
827 arvados-cwl-runner and wait for the final result."""
829 def __init__(self, runner,
830 tool, loadingContext, enable_reuse,
831 output_name, output_tags, submit_runner_ram=0,
832 name=None, on_error=None, submit_runner_image=None,
833 intermediate_output_ttl=0, merged_map=None,
834 priority=None, secret_store=None,
835 collection_cache_size=256,
836 collection_cache_is_default=True,
839 self.loadingContext = loadingContext.copy()
841 super(Runner, self).__init__(tool.tool, loadingContext)
843 self.arvrunner = runner
844 self.embedded_tool = tool
845 self.job_order = None
848 # If reuse is permitted by command line arguments but
849 # disabled by the workflow itself, disable it.
850 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
852 enable_reuse = reuse_req["enableReuse"]
853 reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
855 enable_reuse = reuse_req["enableReuse"]
856 self.enable_reuse = enable_reuse
858 self.final_output = None
859 self.output_name = output_name
860 self.output_tags = output_tags
862 self.on_error = on_error
863 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
864 self.intermediate_output_ttl = intermediate_output_ttl
865 self.priority = priority
866 self.secret_store = secret_store
867 self.enable_dev = self.loadingContext.enable_dev
868 self.git_info = git_info
869 self.fast_parser = self.loadingContext.fast_parser
871 self.submit_runner_cores = 1
872 self.submit_runner_ram = 1024 # defaut 1 GiB
873 self.collection_cache_size = collection_cache_size
875 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
876 if runner_resource_req:
877 if runner_resource_req.get("coresMin"):
878 self.submit_runner_cores = runner_resource_req["coresMin"]
879 if runner_resource_req.get("ramMin"):
880 self.submit_runner_ram = runner_resource_req["ramMin"]
881 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
882 self.collection_cache_size = runner_resource_req["keep_cache"]
884 if submit_runner_ram:
885 # Command line / initializer overrides default and/or spec from workflow
886 self.submit_runner_ram = submit_runner_ram
888 if self.submit_runner_ram <= 0:
889 raise Exception("Value of submit-runner-ram must be greater than zero")
891 if self.submit_runner_cores <= 0:
892 raise Exception("Value of submit-runner-cores must be greater than zero")
894 self.merged_map = merged_map or {}
897 job_order, # type: Mapping[Text, Text]
898 output_callbacks, # type: Callable[[Any, Any], Any]
899 runtimeContext # type: RuntimeContext
900 ): # type: (...) -> Generator[Any, None, None]
901 self.job_order = job_order
902 self._init_job(job_order, runtimeContext)
905 def update_pipeline_component(self, record):
908 def done(self, record):
909 """Base method for handling a completed runner."""
912 if record["state"] == "Complete":
913 if record.get("exit_code") is not None:
914 if record["exit_code"] == 33:
915 processStatus = "UnsupportedRequirement"
916 elif record["exit_code"] == 0:
917 processStatus = "success"
919 processStatus = "permanentFail"
921 processStatus = "success"
923 processStatus = "permanentFail"
927 if processStatus == "permanentFail":
928 logc = arvados.collection.CollectionReader(record["log"],
929 api_client=self.arvrunner.api,
930 keep_client=self.arvrunner.keep_client,
931 num_retries=self.arvrunner.num_retries)
932 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
934 self.final_output = record["output"]
935 outc = arvados.collection.CollectionReader(self.final_output,
936 api_client=self.arvrunner.api,
937 keep_client=self.arvrunner.keep_client,
938 num_retries=self.arvrunner.num_retries)
939 if "cwl.output.json" in outc:
940 with outc.open("cwl.output.json", "rb") as f:
942 outputs = json.loads(f.read().decode())
943 def keepify(fileobj):
944 path = fileobj["location"]
945 if not path.startswith("keep:"):
946 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
947 adjustFileObjs(outputs, keepify)
948 adjustDirObjs(outputs, keepify)
950 logger.exception("[%s] While getting final output object", self.name)
951 self.arvrunner.output_callback({}, "permanentFail")
953 self.arvrunner.output_callback(outputs, processStatus)