1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
45 if os.name == "posix" and sys.version_info[0] < 3:
46 import subprocess32 as subprocess
50 from schema_salad.sourceline import SourceLine, cmap
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55 shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
65 import arvados.collection
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
73 from ._version import __version__
75 from . context import ArvRuntimeContext
76 from .perf import Perf
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
81 def trim_anonymous_location(obj):
82 """Remove 'location' field from File and Directory literals.
84 To make internal handling easier, literals are assigned a random id for
85 'location'. However, when writing the record back out, this can break
86 reproducibility. Since it is valid for literals not have a 'location'
91 if obj.get("location", "").startswith("_:"):
95 def remove_redundant_fields(obj):
96 for field in ("path", "nameext", "nameroot", "dirname"):
101 def find_defaults(d, op):
102 if isinstance(d, list):
105 elif isinstance(d, dict):
109 for i in viewvalues(d):
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
115 files=[], # type: List[Dict[Text, Text]]
116 bindings=[], # type: List[Dict[Text, Any]]
117 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
118 names=None, # type: Names
119 requirements=requirements, # type: List[Dict[Text, Any]]
120 hints=hints, # type: List[Dict[Text, Any]]
121 resources={}, # type: Dict[str, int]
122 mutation_manager=None, # type: Optional[MutationManager]
123 formatgraph=None, # type: Optional[Graph]
124 make_fs_access=None, # type: Type[StdFsAccess]
125 fs_access=None, # type: StdFsAccess
126 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127 timeout=runtimeContext.eval_timeout, # type: float
128 debug=runtimeContext.debug, # type: bool
129 js_console=runtimeContext.js_console, # type: bool
130 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
131 loadListing="", # type: Text
132 outdir="", # type: Text
133 tmpdir="", # type: Text
134 stagedir="", # type: Text
135 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136 container_engine="docker"
139 def search_schemadef(name, reqs):
141 if r["class"] == "SchemaDefRequirement":
142 for sd in r["types"]:
143 if sd["name"] == name:
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148 "float", "double", "string", "record",
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153 # union type, collect all possible secondaryFiles
154 for i in inputschema:
155 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
158 if inputschema == "File":
159 inputschema = {"type": "File"}
161 if isinstance(inputschema, basestring):
162 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
168 if "secondaryFiles" in inputschema:
169 # set secondaryFiles, may be inherited by compound types.
170 secondaryspec = inputschema["secondaryFiles"]
172 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173 not isinstance(inputschema["type"], basestring)):
174 # compound type (union, array, record)
175 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
177 elif (inputschema["type"] == "record" and
178 isinstance(primary, Mapping)):
180 # record type, find secondary files associated with fields.
182 for f in inputschema["fields"]:
183 p = primary.get(shortname(f["name"]))
185 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
187 elif (inputschema["type"] == "array" and
188 isinstance(primary, Sequence)):
190 # array type, find secondary files of elements
193 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
195 elif (inputschema["type"] == "File" and
196 isinstance(primary, Mapping) and
197 primary.get("class") == "File"):
199 if "secondaryFiles" in primary or not secondaryspec:
204 # Found a file, check for secondaryFiles
207 primary["secondaryFiles"] = secondaryspec
208 for i, sf in enumerate(aslist(secondaryspec)):
209 if builder.cwlVersion == "v1.0":
212 pattern = sf["pattern"]
215 if isinstance(pattern, list):
216 specs.extend(pattern)
217 elif isinstance(pattern, dict):
218 specs.append(pattern)
219 elif isinstance(pattern, str):
220 if builder.cwlVersion == "v1.0":
221 specs.append({"pattern": pattern, "required": True})
223 specs.append({"pattern": pattern, "required": sf.get("required")})
225 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226 "Expression must return list, object, string or null")
229 for i, sf in enumerate(specs):
230 if isinstance(sf, dict):
231 if sf.get("class") == "File":
233 if sf.get("location") is None:
234 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235 "File object is missing 'location': %s" % sf)
236 sfpath = sf["location"]
239 pattern = sf["pattern"]
240 required = sf.get("required")
241 elif isinstance(sf, str):
245 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246 "Expression must return list, object, string or null")
248 if pattern is not None:
249 if "${" in pattern or "$(" in pattern:
250 sfname = builder.do_eval(pattern, context=primary)
252 sfname = substitute(primary["basename"], pattern)
257 if isinstance(sfname, str):
258 p_location = primary["location"]
259 if "/" in p_location:
261 p_location[0 : p_location.rindex("/") + 1]
265 required = builder.do_eval(required, context=primary)
267 if isinstance(sfname, list) or isinstance(sfname, dict):
268 each = aslist(sfname)
270 if required and not fsaccess.exists(e.get("location")):
271 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272 "Required secondary file '%s' does not exist" % e.get("location"))
275 if isinstance(sfname, str):
276 if fsaccess.exists(sfpath):
277 if pattern is not None:
278 found.append({"location": sfpath, "class": "File"})
282 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283 "Required secondary file '%s' does not exist" % sfpath)
285 primary["secondaryFiles"] = cmap(found)
286 if discovered is not None:
287 discovered[primary["location"]] = primary["secondaryFiles"]
288 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292 for inputschema in inputs:
293 primary = job_order.get(shortname(inputschema["id"]))
294 if isinstance(primary, (Mapping, Sequence)):
295 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
297 def upload_dependencies(arvrunner, name, document_loader,
298 workflowobj, uri, loadref_run, runtimeContext,
299 include_primary=True, discovered_secondaryfiles=None,
301 """Upload the dependencies of the workflowobj document to Keep.
303 Returns a pathmapper object mapping local paths to keep references. Also
304 does an in-place update of references in "workflowobj".
306 Use scandeps to find $import, $include, $schemas, run, File and Directory
307 fields that represent external references.
309 If workflowobj has an "id" field, this will reload the document to ensure
310 it is scanning the raw document prior to preprocessing.
315 joined = document_loader.fetcher.urljoin(b, u)
316 defrg, _ = urllib.parse.urldefrag(joined)
317 if defrg not in loaded:
319 if cache is not None and defrg in cache:
321 # Use fetch_text to get raw file (before preprocessing).
322 text = document_loader.fetch_text(defrg)
323 if isinstance(text, bytes):
324 textIO = StringIO(text.decode('utf-8'))
326 textIO = StringIO(text)
327 yamlloader = YAML(typ='safe', pure=True)
328 result = yamlloader.load(textIO)
329 if cache is not None:
330 cache[defrg] = result
336 loadref_fields = set(("$import", "run"))
338 loadref_fields = set(("$import",))
340 scanobj = workflowobj
341 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
342 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
343 if cache is not None and defrg not in cache:
344 # if we haven't seen this file before, want raw file
345 # content (before preprocessing) to ensure that external
346 # references like $include haven't already been inlined.
347 scanobj = loadref("", workflowobj["id"])
351 with Perf(metrics, "scandeps include, location"):
352 sc_result = scandeps(uri, scanobj,
354 set(("$include", "location")),
355 loadref, urljoin=document_loader.fetcher.urljoin,
358 with Perf(metrics, "scandeps $schemas"):
359 optional_deps = scandeps(uri, scanobj,
362 loadref, urljoin=document_loader.fetcher.urljoin,
365 if sc_result is None:
368 if optional_deps is None:
372 sc_result.extend(optional_deps)
377 def collect_uuids(obj):
378 loc = obj.get("location", "")
381 # Collect collection uuids that need to be resolved to
382 # portable data hashes
383 gp = collection_uuid_pattern.match(loc)
385 uuids[gp.groups()[0]] = obj
386 if collectionUUID in obj:
387 uuids[obj[collectionUUID]] = obj
389 def collect_uploads(obj):
390 loc = obj.get("location", "")
394 if sp[0] in ("file", "http", "https"):
395 # Record local files than need to be uploaded,
396 # don't include file literals, keep references, etc.
400 with Perf(metrics, "collect uuids"):
401 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
403 with Perf(metrics, "collect uploads"):
404 visit_class(sc_result, ("File", "Directory"), collect_uploads)
406 # Resolve any collection uuids we found to portable data hashes
407 # and assign them to uuid_map
409 fetch_uuids = list(uuids.keys())
410 with Perf(metrics, "fetch_uuids"):
412 # For a large number of fetch_uuids, API server may limit
413 # response size, so keep fetching from API server has nothing
415 lookups = arvrunner.api.collections().list(
416 filters=[["uuid", "in", fetch_uuids]],
418 select=["uuid", "portable_data_hash"]).execute(
419 num_retries=arvrunner.num_retries)
421 if not lookups["items"]:
424 for l in lookups["items"]:
425 uuid_map[l["uuid"]] = l["portable_data_hash"]
427 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
429 normalizeFilesDirs(sc)
431 if "id" in workflowobj:
432 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
434 # make sure it's included
435 sc.append({"class": "File", "location": defrg})
437 # make sure it's excluded
438 sc = [d for d in sc if d.get("location") != defrg]
440 def visit_default(obj):
441 def defaults_are_optional(f):
442 if "location" not in f and "path" in f:
443 f["location"] = f["path"]
445 normalizeFilesDirs(f)
446 optional_deps.append(f)
447 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
449 find_defaults(workflowobj, visit_default)
452 def discover_default_secondary_files(obj):
453 builder_job_order = {}
454 for t in obj["inputs"]:
455 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
456 # Need to create a builder object to evaluate expressions.
457 builder = make_builder(builder_job_order,
458 obj.get("hints", []),
459 obj.get("requirements", []),
462 discover_secondary_files(arvrunner.fs_access,
468 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
469 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
471 for d in list(discovered):
472 # Only interested in discovered secondaryFiles which are local
473 # files that need to be uploaded.
474 if d.startswith("file:"):
475 sc.extend(discovered[d])
479 with Perf(metrics, "mapper"):
480 mapper = ArvPathMapper(arvrunner, sc, "",
484 single_collection=True,
485 optional_deps=optional_deps)
489 if k.startswith("keep:"):
490 keeprefs.add(collection_pdh_pattern.match(k).group(1))
493 loc = p.get("location")
494 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
495 p["location"] = mapper.mapper(p["location"]).resolved
496 addkeepref(p["location"])
502 if collectionUUID in p:
503 uuid = p[collectionUUID]
504 if uuid not in uuid_map:
505 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
506 "Collection uuid %s not found" % uuid)
507 gp = collection_pdh_pattern.match(loc)
508 if gp and uuid_map[uuid] != gp.groups()[0]:
509 # This file entry has both collectionUUID and a PDH
510 # location. If the PDH doesn't match the one returned
511 # the API server, raise an error.
512 raise SourceLine(p, "location", validate.ValidationException).makeError(
513 "Expected collection uuid %s to be %s but API server reported %s" % (
514 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
516 gp = collection_uuid_pattern.match(loc)
518 # Not a uuid pattern (must be a pdh pattern)
519 addkeepref(p["location"])
522 uuid = gp.groups()[0]
523 if uuid not in uuid_map:
524 raise SourceLine(p, "location", validate.ValidationException).makeError(
525 "Collection uuid %s not found" % uuid)
526 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
527 p[collectionUUID] = uuid
529 with Perf(metrics, "setloc"):
530 visit_class(workflowobj, ("File", "Directory"), setloc)
531 visit_class(discovered, ("File", "Directory"), setloc)
533 if discovered_secondaryfiles is not None:
535 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
537 if runtimeContext.copy_deps:
538 # Find referenced collections and copy them into the
539 # destination project, for easy sharing.
540 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
541 filters=[["portable_data_hash", "in", list(keeprefs)],
542 ["owner_uuid", "=", runtimeContext.project_uuid]],
543 select=["uuid", "portable_data_hash", "created_at"]))
545 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
547 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
548 order="created_at desc",
549 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
551 if len(col["items"]) == 0:
552 logger.warning("Cannot find collection with portable data hash %s", kr)
554 col = col["items"][0]
556 arvrunner.api.collections().create(body={"collection": {
557 "owner_uuid": runtimeContext.project_uuid,
559 "description": col["description"],
560 "properties": col["properties"],
561 "portable_data_hash": col["portable_data_hash"],
562 "manifest_text": col["manifest_text"],
563 "storage_classes_desired": col["storage_classes_desired"],
564 "trash_at": col["trash_at"]
565 }}, ensure_unique_name=True).execute()
566 except Exception as e:
567 logger.warning("Unable copy collection to destination: %s", e)
569 if "$schemas" in workflowobj:
571 for s in workflowobj["$schemas"]:
573 sch.append(mapper.mapper(s).resolved)
574 workflowobj["$schemas"] = sch
579 def upload_docker(arvrunner, tool, runtimeContext):
580 """Uploads Docker images used in CommandLineTool objects."""
582 if isinstance(tool, CommandLineTool):
583 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
585 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
586 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
587 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
589 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
590 runtimeContext.project_uuid,
591 runtimeContext.force_docker_pull,
592 runtimeContext.tmp_outdir_prefix,
593 runtimeContext.match_local_docker,
594 runtimeContext.copy_deps)
596 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
598 runtimeContext.project_uuid,
599 runtimeContext.force_docker_pull,
600 runtimeContext.tmp_outdir_prefix,
601 runtimeContext.match_local_docker,
602 runtimeContext.copy_deps)
603 elif isinstance(tool, cwltool.workflow.Workflow):
605 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
608 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
609 """Create a packed workflow.
611 A "packed" workflow is one where all the components have been combined into a single document."""
614 packed = pack(arvrunner.loadingContext, tool.tool["id"],
615 rewrite_out=rewrites,
616 loader=tool.doc_loader)
618 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
620 def visit(v, cur_id):
621 if isinstance(v, dict):
622 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
623 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
624 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
626 cur_id = rewrite_to_orig.get(v["id"], v["id"])
627 if "path" in v and "location" not in v:
628 v["location"] = v["path"]
630 if "location" in v and cur_id in merged_map:
631 if v["location"] in merged_map[cur_id].resolved:
632 v["location"] = merged_map[cur_id].resolved[v["location"]]
633 if v["location"] in merged_map[cur_id].secondaryFiles:
634 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
635 if v.get("class") == "DockerRequirement":
636 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
637 runtimeContext.project_uuid,
638 runtimeContext.force_docker_pull,
639 runtimeContext.tmp_outdir_prefix,
640 runtimeContext.match_local_docker,
641 runtimeContext.copy_deps)
644 if isinstance(v, list):
651 packed[g] = git_info[g]
656 def tag_git_version(packed):
657 if tool.tool["id"].startswith("file://"):
658 path = os.path.dirname(tool.tool["id"][7:])
660 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
661 except (OSError, subprocess.CalledProcessError):
664 packed["http://schema.org/version"] = githash
667 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
668 """Upload local files referenced in the input object and return updated input
669 object with 'location' updated to the proper keep references.
672 # Make a copy of the job order and set defaults.
673 builder_job_order = copy.copy(job_order)
675 # fill_in_defaults throws an error if there are any
676 # missing required parameters, we don't want it to do that
677 # so make them all optional.
678 inputs_copy = copy.deepcopy(tool.tool["inputs"])
679 for i in inputs_copy:
680 if "null" not in i["type"]:
681 i["type"] = ["null"] + aslist(i["type"])
683 fill_in_defaults(inputs_copy,
686 # Need to create a builder object to evaluate expressions.
687 builder = make_builder(builder_job_order,
692 # Now update job_order with secondaryFiles
693 discover_secondary_files(arvrunner.fs_access,
698 _jobloaderctx = jobloaderctx.copy()
699 jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
701 jobmapper = upload_dependencies(arvrunner,
705 job_order.get("id", "#"),
709 if "id" in job_order:
712 # Need to filter this out, gets added by cwltool when providing
713 # parameters on the command line.
714 if "job_order" in job_order:
715 del job_order["job_order"]
719 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
721 def upload_workflow_deps(arvrunner, tool, runtimeContext):
722 # Ensure that Docker images needed by this workflow are available
724 with Perf(metrics, "upload_docker"):
725 upload_docker(arvrunner, tool, runtimeContext)
727 document_loader = tool.doc_loader
734 # Standard traversal is top down, we want to go bottom up, so use
735 # the visitor to accumalate a list of nodes to visit, then
736 # visit them in reverse order.
737 def upload_tool_deps(deptool):
741 tool.visit(upload_tool_deps)
743 for deptool in reversed(todo):
744 discovered_secondaryfiles = {}
745 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
746 pm = upload_dependencies(arvrunner,
747 "%s dependencies" % (shortname(deptool["id"])),
753 include_primary=False,
754 discovered_secondaryfiles=discovered_secondaryfiles,
755 cache=tool_dep_cache)
756 document_loader.idx[deptool["id"]] = deptool
758 for k,v in pm.items():
759 toolmap[k] = v.resolved
760 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
764 def arvados_jobs_image(arvrunner, img, runtimeContext):
765 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
768 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
770 runtimeContext.project_uuid,
771 runtimeContext.force_docker_pull,
772 runtimeContext.tmp_outdir_prefix,
773 runtimeContext.match_local_docker,
774 runtimeContext.copy_deps)
775 except Exception as e:
776 raise Exception("Docker image %s is not available\n%s" % (img, e) )
779 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
780 collection = arvados.collection.Collection(api_client=arvrunner.api,
781 keep_client=arvrunner.keep_client,
782 num_retries=arvrunner.num_retries)
783 with collection.open("workflow.cwl", "w") as f:
784 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
786 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
787 ["name", "like", name+"%"]]
788 if runtimeContext.project_uuid:
789 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
790 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
793 logger.info("Using collection %s", exists["items"][0]["uuid"])
795 collection.save_new(name=name,
796 owner_uuid=runtimeContext.project_uuid,
797 ensure_unique_name=True,
798 num_retries=arvrunner.num_retries)
799 logger.info("Uploaded to %s", collection.manifest_locator())
801 return collection.portable_data_hash()
804 class Runner(Process):
805 """Base class for runner processes, which submit an instance of
806 arvados-cwl-runner and wait for the final result."""
808 def __init__(self, runner, updated_tool,
809 tool, loadingContext, enable_reuse,
810 output_name, output_tags, submit_runner_ram=0,
811 name=None, on_error=None, submit_runner_image=None,
812 intermediate_output_ttl=0, merged_map=None,
813 priority=None, secret_store=None,
814 collection_cache_size=256,
815 collection_cache_is_default=True,
818 loadingContext = loadingContext.copy()
819 loadingContext.metadata = updated_tool.metadata.copy()
821 super(Runner, self).__init__(updated_tool.tool, loadingContext)
823 self.arvrunner = runner
824 self.embedded_tool = tool
825 self.job_order = None
828 # If reuse is permitted by command line arguments but
829 # disabled by the workflow itself, disable it.
830 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
832 enable_reuse = reuse_req["enableReuse"]
833 self.enable_reuse = enable_reuse
835 self.final_output = None
836 self.output_name = output_name
837 self.output_tags = output_tags
839 self.on_error = on_error
840 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
841 self.intermediate_output_ttl = intermediate_output_ttl
842 self.priority = priority
843 self.secret_store = secret_store
844 self.enable_dev = loadingContext.enable_dev
845 self.git_info = git_info
847 self.submit_runner_cores = 1
848 self.submit_runner_ram = 1024 # defaut 1 GiB
849 self.collection_cache_size = collection_cache_size
851 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
852 if runner_resource_req:
853 if runner_resource_req.get("coresMin"):
854 self.submit_runner_cores = runner_resource_req["coresMin"]
855 if runner_resource_req.get("ramMin"):
856 self.submit_runner_ram = runner_resource_req["ramMin"]
857 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
858 self.collection_cache_size = runner_resource_req["keep_cache"]
860 if submit_runner_ram:
861 # Command line / initializer overrides default and/or spec from workflow
862 self.submit_runner_ram = submit_runner_ram
864 if self.submit_runner_ram <= 0:
865 raise Exception("Value of submit-runner-ram must be greater than zero")
867 if self.submit_runner_cores <= 0:
868 raise Exception("Value of submit-runner-cores must be greater than zero")
870 self.merged_map = merged_map or {}
873 job_order, # type: Mapping[Text, Text]
874 output_callbacks, # type: Callable[[Any, Any], Any]
875 runtimeContext # type: RuntimeContext
876 ): # type: (...) -> Generator[Any, None, None]
877 self.job_order = job_order
878 self._init_job(job_order, runtimeContext)
881 def update_pipeline_component(self, record):
884 def done(self, record):
885 """Base method for handling a completed runner."""
888 if record["state"] == "Complete":
889 if record.get("exit_code") is not None:
890 if record["exit_code"] == 33:
891 processStatus = "UnsupportedRequirement"
892 elif record["exit_code"] == 0:
893 processStatus = "success"
895 processStatus = "permanentFail"
897 processStatus = "success"
899 processStatus = "permanentFail"
903 if processStatus == "permanentFail":
904 logc = arvados.collection.CollectionReader(record["log"],
905 api_client=self.arvrunner.api,
906 keep_client=self.arvrunner.keep_client,
907 num_retries=self.arvrunner.num_retries)
908 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
910 self.final_output = record["output"]
911 outc = arvados.collection.CollectionReader(self.final_output,
912 api_client=self.arvrunner.api,
913 keep_client=self.arvrunner.keep_client,
914 num_retries=self.arvrunner.num_retries)
915 if "cwl.output.json" in outc:
916 with outc.open("cwl.output.json", "rb") as f:
918 outputs = json.loads(f.read().decode())
919 def keepify(fileobj):
920 path = fileobj["location"]
921 if not path.startswith("keep:"):
922 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
923 adjustFileObjs(outputs, keepify)
924 adjustDirObjs(outputs, keepify)
926 logger.exception("[%s] While getting final output object", self.name)
927 self.arvrunner.output_callback({}, "permanentFail")
929 self.arvrunner.output_callback(outputs, processStatus)