1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
45 if os.name == "posix" and sys.version_info[0] < 3:
46 import subprocess32 as subprocess
50 from schema_salad.sourceline import SourceLine, cmap
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55 shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
64 import arvados.collection
66 from .util import collectionUUID
67 from ruamel.yaml import YAML
68 from ruamel.yaml.comments import CommentedMap, CommentedSeq
70 import arvados_cwl.arvdocker
71 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
72 from ._version import __version__
74 from . context import ArvRuntimeContext
75 from .perf import Perf
77 logger = logging.getLogger('arvados.cwl-runner')
78 metrics = logging.getLogger('arvados.cwl-runner.metrics')
80 def trim_anonymous_location(obj):
81 """Remove 'location' field from File and Directory literals.
83 To make internal handling easier, literals are assigned a random id for
84 'location'. However, when writing the record back out, this can break
85 reproducibility. Since it is valid for literals not have a 'location'
90 if obj.get("location", "").startswith("_:"):
94 def remove_redundant_fields(obj):
95 for field in ("path", "nameext", "nameroot", "dirname"):
100 def find_defaults(d, op):
101 if isinstance(d, list):
104 elif isinstance(d, dict):
108 for i in viewvalues(d):
111 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
114 files=[], # type: List[Dict[Text, Text]]
115 bindings=[], # type: List[Dict[Text, Any]]
116 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
117 names=None, # type: Names
118 requirements=requirements, # type: List[Dict[Text, Any]]
119 hints=hints, # type: List[Dict[Text, Any]]
120 resources={}, # type: Dict[str, int]
121 mutation_manager=None, # type: Optional[MutationManager]
122 formatgraph=None, # type: Optional[Graph]
123 make_fs_access=None, # type: Type[StdFsAccess]
124 fs_access=None, # type: StdFsAccess
125 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
126 timeout=runtimeContext.eval_timeout, # type: float
127 debug=runtimeContext.debug, # type: bool
128 js_console=runtimeContext.js_console, # type: bool
129 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
130 loadListing="", # type: Text
131 outdir="", # type: Text
132 tmpdir="", # type: Text
133 stagedir="", # type: Text
134 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
135 container_engine="docker"
138 def search_schemadef(name, reqs):
140 if r["class"] == "SchemaDefRequirement":
141 for sd in r["types"]:
142 if sd["name"] == name:
146 primitive_types_set = frozenset(("null", "boolean", "int", "long",
147 "float", "double", "string", "record",
150 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
151 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
152 # union type, collect all possible secondaryFiles
153 for i in inputschema:
154 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
157 if inputschema == "File":
158 inputschema = {"type": "File"}
160 if isinstance(inputschema, basestring):
161 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
167 if "secondaryFiles" in inputschema:
168 # set secondaryFiles, may be inherited by compound types.
169 secondaryspec = inputschema["secondaryFiles"]
171 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
172 not isinstance(inputschema["type"], basestring)):
173 # compound type (union, array, record)
174 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
176 elif (inputschema["type"] == "record" and
177 isinstance(primary, Mapping)):
179 # record type, find secondary files associated with fields.
181 for f in inputschema["fields"]:
182 p = primary.get(shortname(f["name"]))
184 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
186 elif (inputschema["type"] == "array" and
187 isinstance(primary, Sequence)):
189 # array type, find secondary files of elements
192 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
194 elif (inputschema["type"] == "File" and
195 isinstance(primary, Mapping) and
196 primary.get("class") == "File"):
198 if "secondaryFiles" in primary or not secondaryspec:
203 # Found a file, check for secondaryFiles
206 primary["secondaryFiles"] = secondaryspec
207 for i, sf in enumerate(aslist(secondaryspec)):
208 if builder.cwlVersion == "v1.0":
211 pattern = sf["pattern"]
214 if isinstance(pattern, list):
215 specs.extend(pattern)
216 elif isinstance(pattern, dict):
217 specs.append(pattern)
218 elif isinstance(pattern, str):
219 if builder.cwlVersion == "v1.0":
220 specs.append({"pattern": pattern, "required": True})
222 specs.append({"pattern": pattern, "required": sf.get("required")})
224 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
225 "Expression must return list, object, string or null")
228 for i, sf in enumerate(specs):
229 if isinstance(sf, dict):
230 if sf.get("class") == "File":
232 if sf.get("location") is None:
233 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
234 "File object is missing 'location': %s" % sf)
235 sfpath = sf["location"]
238 pattern = sf["pattern"]
239 required = sf.get("required")
240 elif isinstance(sf, str):
244 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
245 "Expression must return list, object, string or null")
247 if pattern is not None:
248 if "${" in pattern or "$(" in pattern:
249 sfname = builder.do_eval(pattern, context=primary)
251 sfname = substitute(primary["basename"], pattern)
256 if isinstance(sfname, str):
257 p_location = primary["location"]
258 if "/" in p_location:
260 p_location[0 : p_location.rindex("/") + 1]
264 required = builder.do_eval(required, context=primary)
266 if isinstance(sfname, list) or isinstance(sfname, dict):
267 each = aslist(sfname)
269 if required and not fsaccess.exists(e.get("location")):
270 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
271 "Required secondary file '%s' does not exist" % e.get("location"))
274 if isinstance(sfname, str):
275 if fsaccess.exists(sfpath):
276 if pattern is not None:
277 found.append({"location": sfpath, "class": "File"})
281 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
282 "Required secondary file '%s' does not exist" % sfpath)
284 primary["secondaryFiles"] = cmap(found)
285 if discovered is not None:
286 discovered[primary["location"]] = primary["secondaryFiles"]
287 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
288 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
290 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
291 for inputschema in inputs:
292 primary = job_order.get(shortname(inputschema["id"]))
293 if isinstance(primary, (Mapping, Sequence)):
294 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
296 def upload_dependencies(arvrunner, name, document_loader,
297 workflowobj, uri, loadref_run, runtimeContext,
298 include_primary=True, discovered_secondaryfiles=None,
300 """Upload the dependencies of the workflowobj document to Keep.
302 Returns a pathmapper object mapping local paths to keep references. Also
303 does an in-place update of references in "workflowobj".
305 Use scandeps to find $import, $include, $schemas, run, File and Directory
306 fields that represent external references.
308 If workflowobj has an "id" field, this will reload the document to ensure
309 it is scanning the raw document prior to preprocessing.
314 joined = document_loader.fetcher.urljoin(b, u)
315 defrg, _ = urllib.parse.urldefrag(joined)
316 if defrg not in loaded:
318 if cache is not None and defrg in cache:
320 # Use fetch_text to get raw file (before preprocessing).
321 text = document_loader.fetch_text(defrg)
322 if isinstance(text, bytes):
323 textIO = StringIO(text.decode('utf-8'))
325 textIO = StringIO(text)
326 yamlloader = YAML(typ='safe', pure=True)
327 result = yamlloader.load(textIO)
328 if cache is not None:
329 cache[defrg] = result
335 loadref_fields = set(("$import", "run"))
337 loadref_fields = set(("$import",))
339 scanobj = workflowobj
340 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
341 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
342 if cache is not None and defrg not in cache:
343 # if we haven't seen this file before, want raw file
344 # content (before preprocessing) to ensure that external
345 # references like $include haven't already been inlined.
346 scanobj = loadref("", workflowobj["id"])
350 with Perf(metrics, "scandeps include, location"):
351 sc_result = scandeps(uri, scanobj,
353 set(("$include", "location")),
354 loadref, urljoin=document_loader.fetcher.urljoin,
357 with Perf(metrics, "scandeps $schemas"):
358 optional_deps = scandeps(uri, scanobj,
361 loadref, urljoin=document_loader.fetcher.urljoin,
364 if sc_result is None:
367 if optional_deps is None:
371 sc_result.extend(optional_deps)
376 def collect_uuids(obj):
377 loc = obj.get("location", "")
380 # Collect collection uuids that need to be resolved to
381 # portable data hashes
382 gp = collection_uuid_pattern.match(loc)
384 uuids[gp.groups()[0]] = obj
385 if collectionUUID in obj:
386 uuids[obj[collectionUUID]] = obj
388 def collect_uploads(obj):
389 loc = obj.get("location", "")
393 if sp[0] in ("file", "http", "https"):
394 # Record local files than need to be uploaded,
395 # don't include file literals, keep references, etc.
399 with Perf(metrics, "collect uuids"):
400 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
402 with Perf(metrics, "collect uploads"):
403 visit_class(sc_result, ("File", "Directory"), collect_uploads)
405 # Resolve any collection uuids we found to portable data hashes
406 # and assign them to uuid_map
408 fetch_uuids = list(uuids.keys())
409 with Perf(metrics, "fetch_uuids"):
411 # For a large number of fetch_uuids, API server may limit
412 # response size, so keep fetching from API server has nothing
414 lookups = arvrunner.api.collections().list(
415 filters=[["uuid", "in", fetch_uuids]],
417 select=["uuid", "portable_data_hash"]).execute(
418 num_retries=arvrunner.num_retries)
420 if not lookups["items"]:
423 for l in lookups["items"]:
424 uuid_map[l["uuid"]] = l["portable_data_hash"]
426 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
428 normalizeFilesDirs(sc)
430 if "id" in workflowobj:
431 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
433 # make sure it's included
434 sc.append({"class": "File", "location": defrg})
436 # make sure it's excluded
437 sc = [d for d in sc if d.get("location") != defrg]
439 def visit_default(obj):
440 def defaults_are_optional(f):
441 if "location" not in f and "path" in f:
442 f["location"] = f["path"]
444 normalizeFilesDirs(f)
445 optional_deps.append(f)
446 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
448 find_defaults(workflowobj, visit_default)
451 def discover_default_secondary_files(obj):
452 builder_job_order = {}
453 for t in obj["inputs"]:
454 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
455 # Need to create a builder object to evaluate expressions.
456 builder = make_builder(builder_job_order,
457 obj.get("hints", []),
458 obj.get("requirements", []),
461 discover_secondary_files(arvrunner.fs_access,
467 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
468 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
470 for d in list(discovered):
471 # Only interested in discovered secondaryFiles which are local
472 # files that need to be uploaded.
473 if d.startswith("file:"):
474 sc.extend(discovered[d])
478 with Perf(metrics, "mapper"):
479 mapper = ArvPathMapper(arvrunner, sc, "",
483 single_collection=True,
484 optional_deps=optional_deps)
488 if k.startswith("keep:"):
489 keeprefs.add(collection_pdh_pattern.match(k).group(1))
492 loc = p.get("location")
493 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
494 p["location"] = mapper.mapper(p["location"]).resolved
495 addkeepref(p["location"])
501 if collectionUUID in p:
502 uuid = p[collectionUUID]
503 if uuid not in uuid_map:
504 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
505 "Collection uuid %s not found" % uuid)
506 gp = collection_pdh_pattern.match(loc)
507 if gp and uuid_map[uuid] != gp.groups()[0]:
508 # This file entry has both collectionUUID and a PDH
509 # location. If the PDH doesn't match the one returned
510 # the API server, raise an error.
511 raise SourceLine(p, "location", validate.ValidationException).makeError(
512 "Expected collection uuid %s to be %s but API server reported %s" % (
513 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
515 gp = collection_uuid_pattern.match(loc)
517 # Not a uuid pattern (must be a pdh pattern)
518 addkeepref(p["location"])
521 uuid = gp.groups()[0]
522 if uuid not in uuid_map:
523 raise SourceLine(p, "location", validate.ValidationException).makeError(
524 "Collection uuid %s not found" % uuid)
525 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
526 p[collectionUUID] = uuid
528 with Perf(metrics, "setloc"):
529 visit_class(workflowobj, ("File", "Directory"), setloc)
530 visit_class(discovered, ("File", "Directory"), setloc)
532 if discovered_secondaryfiles is not None:
534 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
536 if runtimeContext.copy_deps:
537 # Find referenced collections and copy them into the
538 # destination project, for easy sharing.
539 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
540 filters=[["portable_data_hash", "in", list(keeprefs)],
541 ["owner_uuid", "=", runtimeContext.project_uuid]],
542 select=["uuid", "portable_data_hash", "created_at"]))
544 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
546 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
547 order="created_at desc",
548 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
550 if len(col["items"]) == 0:
551 logger.warning("Cannot find collection with portable data hash %s", kr)
553 col = col["items"][0]
555 arvrunner.api.collections().create(body={"collection": {
556 "owner_uuid": runtimeContext.project_uuid,
558 "description": col["description"],
559 "properties": col["properties"],
560 "portable_data_hash": col["portable_data_hash"],
561 "manifest_text": col["manifest_text"],
562 "storage_classes_desired": col["storage_classes_desired"],
563 "trash_at": col["trash_at"]
564 }}, ensure_unique_name=True).execute()
565 except Exception as e:
566 logger.warning("Unable copy collection to destination: %s", e)
568 if "$schemas" in workflowobj:
570 for s in workflowobj["$schemas"]:
572 sch.append(mapper.mapper(s).resolved)
573 workflowobj["$schemas"] = sch
578 def upload_docker(arvrunner, tool, runtimeContext):
579 """Uploads Docker images used in CommandLineTool objects."""
581 if isinstance(tool, CommandLineTool):
582 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
584 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
585 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
586 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
588 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
589 runtimeContext.project_uuid,
590 runtimeContext.force_docker_pull,
591 runtimeContext.tmp_outdir_prefix,
592 runtimeContext.match_local_docker,
593 runtimeContext.copy_deps)
595 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
597 runtimeContext.project_uuid,
598 runtimeContext.force_docker_pull,
599 runtimeContext.tmp_outdir_prefix,
600 runtimeContext.match_local_docker,
601 runtimeContext.copy_deps)
602 elif isinstance(tool, cwltool.workflow.Workflow):
604 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
607 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
608 """Create a packed workflow.
610 A "packed" workflow is one where all the components have been combined into a single document."""
613 packed = pack(arvrunner.loadingContext, tool.tool["id"],
614 rewrite_out=rewrites,
615 loader=tool.doc_loader)
617 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
619 def visit(v, cur_id):
620 if isinstance(v, dict):
621 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
622 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
623 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
625 cur_id = rewrite_to_orig.get(v["id"], v["id"])
626 if "path" in v and "location" not in v:
627 v["location"] = v["path"]
629 if "location" in v and cur_id in merged_map:
630 if v["location"] in merged_map[cur_id].resolved:
631 v["location"] = merged_map[cur_id].resolved[v["location"]]
632 if v["location"] in merged_map[cur_id].secondaryFiles:
633 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
634 if v.get("class") == "DockerRequirement":
635 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
636 runtimeContext.project_uuid,
637 runtimeContext.force_docker_pull,
638 runtimeContext.tmp_outdir_prefix,
639 runtimeContext.match_local_docker,
640 runtimeContext.copy_deps)
643 if isinstance(v, list):
650 packed[g] = git_info[g]
655 def tag_git_version(packed):
656 if tool.tool["id"].startswith("file://"):
657 path = os.path.dirname(tool.tool["id"][7:])
659 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
660 except (OSError, subprocess.CalledProcessError):
663 packed["http://schema.org/version"] = githash
666 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
667 """Upload local files referenced in the input object and return updated input
668 object with 'location' updated to the proper keep references.
671 # Make a copy of the job order and set defaults.
672 builder_job_order = copy.copy(job_order)
674 # fill_in_defaults throws an error if there are any
675 # missing required parameters, we don't want it to do that
676 # so make them all optional.
677 inputs_copy = copy.deepcopy(tool.tool["inputs"])
678 for i in inputs_copy:
679 if "null" not in i["type"]:
680 i["type"] = ["null"] + aslist(i["type"])
682 fill_in_defaults(inputs_copy,
685 # Need to create a builder object to evaluate expressions.
686 builder = make_builder(builder_job_order,
691 # Now update job_order with secondaryFiles
692 discover_secondary_files(arvrunner.fs_access,
697 jobmapper = upload_dependencies(arvrunner,
701 job_order.get("id", "#"),
705 if "id" in job_order:
708 # Need to filter this out, gets added by cwltool when providing
709 # parameters on the command line.
710 if "job_order" in job_order:
711 del job_order["job_order"]
715 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
717 def upload_workflow_deps(arvrunner, tool, runtimeContext):
718 # Ensure that Docker images needed by this workflow are available
720 with Perf(metrics, "upload_docker"):
721 upload_docker(arvrunner, tool, runtimeContext)
723 document_loader = tool.doc_loader
727 def upload_tool_deps(deptool):
729 discovered_secondaryfiles = {}
730 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
731 pm = upload_dependencies(arvrunner,
732 "%s dependencies" % (shortname(deptool["id"])),
738 include_primary=False,
739 discovered_secondaryfiles=discovered_secondaryfiles,
740 cache=tool_dep_cache)
741 document_loader.idx[deptool["id"]] = deptool
743 for k,v in pm.items():
744 toolmap[k] = v.resolved
745 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
747 tool.visit(upload_tool_deps)
751 def arvados_jobs_image(arvrunner, img, runtimeContext):
752 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
755 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
757 runtimeContext.project_uuid,
758 runtimeContext.force_docker_pull,
759 runtimeContext.tmp_outdir_prefix,
760 runtimeContext.match_local_docker,
761 runtimeContext.copy_deps)
762 except Exception as e:
763 raise Exception("Docker image %s is not available\n%s" % (img, e) )
766 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
767 collection = arvados.collection.Collection(api_client=arvrunner.api,
768 keep_client=arvrunner.keep_client,
769 num_retries=arvrunner.num_retries)
770 with collection.open("workflow.cwl", "w") as f:
771 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
773 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
774 ["name", "like", name+"%"]]
775 if runtimeContext.project_uuid:
776 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
777 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
780 logger.info("Using collection %s", exists["items"][0]["uuid"])
782 collection.save_new(name=name,
783 owner_uuid=runtimeContext.project_uuid,
784 ensure_unique_name=True,
785 num_retries=arvrunner.num_retries)
786 logger.info("Uploaded to %s", collection.manifest_locator())
788 return collection.portable_data_hash()
791 class Runner(Process):
792 """Base class for runner processes, which submit an instance of
793 arvados-cwl-runner and wait for the final result."""
795 def __init__(self, runner, updated_tool,
796 tool, loadingContext, enable_reuse,
797 output_name, output_tags, submit_runner_ram=0,
798 name=None, on_error=None, submit_runner_image=None,
799 intermediate_output_ttl=0, merged_map=None,
800 priority=None, secret_store=None,
801 collection_cache_size=256,
802 collection_cache_is_default=True,
805 loadingContext = loadingContext.copy()
806 loadingContext.metadata = updated_tool.metadata.copy()
808 super(Runner, self).__init__(updated_tool.tool, loadingContext)
810 self.arvrunner = runner
811 self.embedded_tool = tool
812 self.job_order = None
815 # If reuse is permitted by command line arguments but
816 # disabled by the workflow itself, disable it.
817 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
819 enable_reuse = reuse_req["enableReuse"]
820 self.enable_reuse = enable_reuse
822 self.final_output = None
823 self.output_name = output_name
824 self.output_tags = output_tags
826 self.on_error = on_error
827 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
828 self.intermediate_output_ttl = intermediate_output_ttl
829 self.priority = priority
830 self.secret_store = secret_store
831 self.enable_dev = loadingContext.enable_dev
832 self.git_info = git_info
834 self.submit_runner_cores = 1
835 self.submit_runner_ram = 1024 # defaut 1 GiB
836 self.collection_cache_size = collection_cache_size
838 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
839 if runner_resource_req:
840 if runner_resource_req.get("coresMin"):
841 self.submit_runner_cores = runner_resource_req["coresMin"]
842 if runner_resource_req.get("ramMin"):
843 self.submit_runner_ram = runner_resource_req["ramMin"]
844 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
845 self.collection_cache_size = runner_resource_req["keep_cache"]
847 if submit_runner_ram:
848 # Command line / initializer overrides default and/or spec from workflow
849 self.submit_runner_ram = submit_runner_ram
851 if self.submit_runner_ram <= 0:
852 raise Exception("Value of submit-runner-ram must be greater than zero")
854 if self.submit_runner_cores <= 0:
855 raise Exception("Value of submit-runner-cores must be greater than zero")
857 self.merged_map = merged_map or {}
860 job_order, # type: Mapping[Text, Text]
861 output_callbacks, # type: Callable[[Any, Any], Any]
862 runtimeContext # type: RuntimeContext
863 ): # type: (...) -> Generator[Any, None, None]
864 self.job_order = job_order
865 self._init_job(job_order, runtimeContext)
868 def update_pipeline_component(self, record):
871 def done(self, record):
872 """Base method for handling a completed runner."""
875 if record["state"] == "Complete":
876 if record.get("exit_code") is not None:
877 if record["exit_code"] == 33:
878 processStatus = "UnsupportedRequirement"
879 elif record["exit_code"] == 0:
880 processStatus = "success"
882 processStatus = "permanentFail"
884 processStatus = "success"
886 processStatus = "permanentFail"
890 if processStatus == "permanentFail":
891 logc = arvados.collection.CollectionReader(record["log"],
892 api_client=self.arvrunner.api,
893 keep_client=self.arvrunner.keep_client,
894 num_retries=self.arvrunner.num_retries)
895 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
897 self.final_output = record["output"]
898 outc = arvados.collection.CollectionReader(self.final_output,
899 api_client=self.arvrunner.api,
900 keep_client=self.arvrunner.keep_client,
901 num_retries=self.arvrunner.num_retries)
902 if "cwl.output.json" in outc:
903 with outc.open("cwl.output.json", "rb") as f:
905 outputs = json.loads(f.read().decode())
906 def keepify(fileobj):
907 path = fileobj["location"]
908 if not path.startswith("keep:"):
909 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
910 adjustFileObjs(outputs, keepify)
911 adjustDirObjs(outputs, keepify)
913 logger.exception("[%s] While getting final output object", self.name)
914 self.arvrunner.output_callback({}, "permanentFail")
916 self.arvrunner.output_callback(outputs, processStatus)