1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
45 if os.name == "posix" and sys.version_info[0] < 3:
46 import subprocess32 as subprocess
50 from schema_salad.sourceline import SourceLine, cmap
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs,
55 shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
64 import arvados.collection
66 from .util import collectionUUID
67 from ruamel.yaml import YAML
68 from ruamel.yaml.comments import CommentedMap, CommentedSeq
70 import arvados_cwl.arvdocker
71 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
72 from ._version import __version__
74 from . context import ArvRuntimeContext
75 from .perf import Perf
77 logger = logging.getLogger('arvados.cwl-runner')
78 metrics = logging.getLogger('arvados.cwl-runner.metrics')
80 def trim_anonymous_location(obj):
81 """Remove 'location' field from File and Directory literals.
83 To make internal handling easier, literals are assigned a random id for
84 'location'. However, when writing the record back out, this can break
85 reproducibility. Since it is valid for literals not have a 'location'
90 if obj.get("location", "").startswith("_:"):
94 def remove_redundant_fields(obj):
95 for field in ("path", "nameext", "nameroot", "dirname"):
100 def find_defaults(d, op):
101 if isinstance(d, list):
104 elif isinstance(d, dict):
108 for i in viewvalues(d):
111 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
114 files=[], # type: List[Dict[Text, Text]]
115 bindings=[], # type: List[Dict[Text, Any]]
116 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
117 names=None, # type: Names
118 requirements=requirements, # type: List[Dict[Text, Any]]
119 hints=hints, # type: List[Dict[Text, Any]]
120 resources={}, # type: Dict[str, int]
121 mutation_manager=None, # type: Optional[MutationManager]
122 formatgraph=None, # type: Optional[Graph]
123 make_fs_access=None, # type: Type[StdFsAccess]
124 fs_access=None, # type: StdFsAccess
125 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
126 timeout=runtimeContext.eval_timeout, # type: float
127 debug=runtimeContext.debug, # type: bool
128 js_console=runtimeContext.js_console, # type: bool
129 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
130 loadListing="", # type: Text
131 outdir="", # type: Text
132 tmpdir="", # type: Text
133 stagedir="", # type: Text
134 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
135 container_engine="docker"
138 def search_schemadef(name, reqs):
140 if r["class"] == "SchemaDefRequirement":
141 for sd in r["types"]:
142 if sd["name"] == name:
146 primitive_types_set = frozenset(("null", "boolean", "int", "long",
147 "float", "double", "string", "record",
150 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
151 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
152 # union type, collect all possible secondaryFiles
153 for i in inputschema:
154 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
157 if inputschema == "File":
158 inputschema = {"type": "File"}
160 if isinstance(inputschema, basestring):
161 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
167 if "secondaryFiles" in inputschema:
168 # set secondaryFiles, may be inherited by compound types.
169 secondaryspec = inputschema["secondaryFiles"]
171 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
172 not isinstance(inputschema["type"], basestring)):
173 # compound type (union, array, record)
174 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
176 elif (inputschema["type"] == "record" and
177 isinstance(primary, Mapping)):
179 # record type, find secondary files associated with fields.
181 for f in inputschema["fields"]:
182 p = primary.get(shortname(f["name"]))
184 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
186 elif (inputschema["type"] == "array" and
187 isinstance(primary, Sequence)):
189 # array type, find secondary files of elements
192 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
194 elif (inputschema["type"] == "File" and
195 isinstance(primary, Mapping) and
196 primary.get("class") == "File"):
198 if "secondaryFiles" in primary or not secondaryspec:
203 # Found a file, check for secondaryFiles
206 primary["secondaryFiles"] = secondaryspec
207 for i, sf in enumerate(aslist(secondaryspec)):
208 if builder.cwlVersion == "v1.0":
211 pattern = sf["pattern"]
214 if isinstance(pattern, list):
215 specs.extend(pattern)
216 elif isinstance(pattern, dict):
217 specs.append(pattern)
218 elif isinstance(pattern, str):
219 if builder.cwlVersion == "v1.0":
220 specs.append({"pattern": pattern, "required": True})
222 specs.append({"pattern": pattern, "required": sf.get("required")})
224 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
225 "Expression must return list, object, string or null")
228 for i, sf in enumerate(specs):
229 if isinstance(sf, dict):
230 if sf.get("class") == "File":
232 if sf.get("location") is None:
233 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
234 "File object is missing 'location': %s" % sf)
235 sfpath = sf["location"]
238 pattern = sf["pattern"]
239 required = sf.get("required")
240 elif isinstance(sf, str):
244 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
245 "Expression must return list, object, string or null")
247 if pattern is not None:
248 if "${" in pattern or "$(" in pattern:
249 sfname = builder.do_eval(pattern, context=primary)
251 sfname = substitute(primary["basename"], pattern)
256 p_location = primary["location"]
257 if "/" in p_location:
259 p_location[0 : p_location.rindex("/") + 1]
263 required = builder.do_eval(required, context=primary)
265 if fsaccess.exists(sfpath):
266 if pattern is not None:
267 found.append({"location": sfpath, "class": "File"})
271 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272 "Required secondary file '%s' does not exist" % sfpath)
274 primary["secondaryFiles"] = cmap(found)
275 if discovered is not None:
276 discovered[primary["location"]] = primary["secondaryFiles"]
277 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
278 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
280 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
281 for inputschema in inputs:
282 primary = job_order.get(shortname(inputschema["id"]))
283 if isinstance(primary, (Mapping, Sequence)):
284 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
286 def upload_dependencies(arvrunner, name, document_loader,
287 workflowobj, uri, loadref_run, runtimeContext,
288 include_primary=True, discovered_secondaryfiles=None,
290 """Upload the dependencies of the workflowobj document to Keep.
292 Returns a pathmapper object mapping local paths to keep references. Also
293 does an in-place update of references in "workflowobj".
295 Use scandeps to find $import, $include, $schemas, run, File and Directory
296 fields that represent external references.
298 If workflowobj has an "id" field, this will reload the document to ensure
299 it is scanning the raw document prior to preprocessing.
304 joined = document_loader.fetcher.urljoin(b, u)
305 defrg, _ = urllib.parse.urldefrag(joined)
306 if defrg not in loaded:
308 if cache is not None and defrg in cache:
310 # Use fetch_text to get raw file (before preprocessing).
311 text = document_loader.fetch_text(defrg)
312 if isinstance(text, bytes):
313 textIO = StringIO(text.decode('utf-8'))
315 textIO = StringIO(text)
316 yamlloader = YAML(typ='safe', pure=True)
317 result = yamlloader.load(textIO)
318 if cache is not None:
319 cache[defrg] = result
325 loadref_fields = set(("$import", "run"))
327 loadref_fields = set(("$import",))
329 scanobj = workflowobj
330 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
331 # Need raw file content (before preprocessing) to ensure
332 # that external references in $include and $mixin are captured.
333 scanobj = loadref("", workflowobj["id"])
337 with Perf(metrics, "scandeps include, location"):
338 sc_result = scandeps(uri, scanobj,
340 set(("$include", "location")),
341 loadref, urljoin=document_loader.fetcher.urljoin,
344 with Perf(metrics, "scandeps $schemas"):
345 optional_deps = scandeps(uri, scanobj,
348 loadref, urljoin=document_loader.fetcher.urljoin,
351 if sc_result is None:
354 if optional_deps is None:
358 sc_result.extend(optional_deps)
363 def collect_uuids(obj):
364 loc = obj.get("location", "")
367 # Collect collection uuids that need to be resolved to
368 # portable data hashes
369 gp = collection_uuid_pattern.match(loc)
371 uuids[gp.groups()[0]] = obj
372 if collectionUUID in obj:
373 uuids[obj[collectionUUID]] = obj
375 def collect_uploads(obj):
376 loc = obj.get("location", "")
380 if sp[0] in ("file", "http", "https"):
381 # Record local files than need to be uploaded,
382 # don't include file literals, keep references, etc.
386 with Perf(metrics, "collect uuids"):
387 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
389 with Perf(metrics, "collect uploads"):
390 visit_class(sc_result, ("File", "Directory"), collect_uploads)
392 # Resolve any collection uuids we found to portable data hashes
393 # and assign them to uuid_map
395 fetch_uuids = list(uuids.keys())
396 with Perf(metrics, "fetch_uuids"):
398 # For a large number of fetch_uuids, API server may limit
399 # response size, so keep fetching from API server has nothing
401 lookups = arvrunner.api.collections().list(
402 filters=[["uuid", "in", fetch_uuids]],
404 select=["uuid", "portable_data_hash"]).execute(
405 num_retries=arvrunner.num_retries)
407 if not lookups["items"]:
410 for l in lookups["items"]:
411 uuid_map[l["uuid"]] = l["portable_data_hash"]
413 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
415 normalizeFilesDirs(sc)
417 if include_primary and "id" in workflowobj:
418 sc.append({"class": "File", "location": workflowobj["id"]})
420 def visit_default(obj):
421 def defaults_are_optional(f):
422 if "location" not in f and "path" in f:
423 f["location"] = f["path"]
425 normalizeFilesDirs(f)
426 optional_deps.append(f)
427 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
429 find_defaults(workflowobj, visit_default)
432 def discover_default_secondary_files(obj):
433 builder_job_order = {}
434 for t in obj["inputs"]:
435 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
436 # Need to create a builder object to evaluate expressions.
437 builder = make_builder(builder_job_order,
438 obj.get("hints", []),
439 obj.get("requirements", []),
442 discover_secondary_files(arvrunner.fs_access,
448 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
449 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
451 for d in list(discovered):
452 # Only interested in discovered secondaryFiles which are local
453 # files that need to be uploaded.
454 if d.startswith("file:"):
455 sc.extend(discovered[d])
459 with Perf(metrics, "mapper"):
460 mapper = ArvPathMapper(arvrunner, sc, "",
464 single_collection=True,
465 optional_deps=optional_deps)
469 if k.startswith("keep:"):
470 keeprefs.add(collection_pdh_pattern.match(k).group(1))
473 loc = p.get("location")
474 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
475 p["location"] = mapper.mapper(p["location"]).resolved
476 addkeepref(p["location"])
482 if collectionUUID in p:
483 uuid = p[collectionUUID]
484 if uuid not in uuid_map:
485 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
486 "Collection uuid %s not found" % uuid)
487 gp = collection_pdh_pattern.match(loc)
488 if gp and uuid_map[uuid] != gp.groups()[0]:
489 # This file entry has both collectionUUID and a PDH
490 # location. If the PDH doesn't match the one returned
491 # the API server, raise an error.
492 raise SourceLine(p, "location", validate.ValidationException).makeError(
493 "Expected collection uuid %s to be %s but API server reported %s" % (
494 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
496 gp = collection_uuid_pattern.match(loc)
498 # Not a uuid pattern (must be a pdh pattern)
499 addkeepref(p["location"])
502 uuid = gp.groups()[0]
503 if uuid not in uuid_map:
504 raise SourceLine(p, "location", validate.ValidationException).makeError(
505 "Collection uuid %s not found" % uuid)
506 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
507 p[collectionUUID] = uuid
509 with Perf(metrics, "setloc"):
510 visit_class(workflowobj, ("File", "Directory"), setloc)
511 visit_class(discovered, ("File", "Directory"), setloc)
513 if discovered_secondaryfiles is not None:
515 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
517 if runtimeContext.copy_deps:
518 # Find referenced collections and copy them into the
519 # destination project, for easy sharing.
520 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
521 filters=[["portable_data_hash", "in", list(keeprefs)],
522 ["owner_uuid", "=", runtimeContext.project_uuid]],
523 select=["uuid", "portable_data_hash", "created_at"]))
525 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
527 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
528 order="created_at desc",
529 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
531 if len(col["items"]) == 0:
532 logger.warning("Cannot find collection with portable data hash %s", kr)
534 col = col["items"][0]
536 arvrunner.api.collections().create(body={"collection": {
537 "owner_uuid": runtimeContext.project_uuid,
539 "description": col["description"],
540 "properties": col["properties"],
541 "portable_data_hash": col["portable_data_hash"],
542 "manifest_text": col["manifest_text"],
543 "storage_classes_desired": col["storage_classes_desired"],
544 "trash_at": col["trash_at"]
545 }}, ensure_unique_name=True).execute()
546 except Exception as e:
547 logger.warning("Unable copy collection to destination: %s", e)
549 if "$schemas" in workflowobj:
551 for s in workflowobj["$schemas"]:
553 sch.append(mapper.mapper(s).resolved)
554 workflowobj["$schemas"] = sch
559 def upload_docker(arvrunner, tool, runtimeContext):
560 """Uploads Docker images used in CommandLineTool objects."""
562 if isinstance(tool, CommandLineTool):
563 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
565 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
566 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
567 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
569 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
570 runtimeContext.project_uuid,
571 runtimeContext.force_docker_pull,
572 runtimeContext.tmp_outdir_prefix,
573 runtimeContext.match_local_docker,
574 runtimeContext.copy_deps)
576 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
578 runtimeContext.project_uuid,
579 runtimeContext.force_docker_pull,
580 runtimeContext.tmp_outdir_prefix,
581 runtimeContext.match_local_docker,
582 runtimeContext.copy_deps)
583 elif isinstance(tool, cwltool.workflow.Workflow):
585 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
588 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
589 """Create a packed workflow.
591 A "packed" workflow is one where all the components have been combined into a single document."""
594 packed = pack(arvrunner.loadingContext, tool.tool["id"],
595 rewrite_out=rewrites,
596 loader=tool.doc_loader)
598 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
600 def visit(v, cur_id):
601 if isinstance(v, dict):
602 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
603 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
604 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
606 cur_id = rewrite_to_orig.get(v["id"], v["id"])
607 if "path" in v and "location" not in v:
608 v["location"] = v["path"]
610 if "location" in v and cur_id in merged_map:
611 if v["location"] in merged_map[cur_id].resolved:
612 v["location"] = merged_map[cur_id].resolved[v["location"]]
613 if v["location"] in merged_map[cur_id].secondaryFiles:
614 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
615 if v.get("class") == "DockerRequirement":
616 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
617 runtimeContext.project_uuid,
618 runtimeContext.force_docker_pull,
619 runtimeContext.tmp_outdir_prefix,
620 runtimeContext.match_local_docker,
621 runtimeContext.copy_deps)
624 if isinstance(v, list):
631 def tag_git_version(packed):
632 if tool.tool["id"].startswith("file://"):
633 path = os.path.dirname(tool.tool["id"][7:])
635 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
636 except (OSError, subprocess.CalledProcessError):
639 packed["http://schema.org/version"] = githash
642 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
643 """Upload local files referenced in the input object and return updated input
644 object with 'location' updated to the proper keep references.
647 # Make a copy of the job order and set defaults.
648 builder_job_order = copy.copy(job_order)
650 # fill_in_defaults throws an error if there are any
651 # missing required parameters, we don't want it to do that
652 # so make them all optional.
653 inputs_copy = copy.deepcopy(tool.tool["inputs"])
654 for i in inputs_copy:
655 if "null" not in i["type"]:
656 i["type"] = ["null"] + aslist(i["type"])
658 fill_in_defaults(inputs_copy,
661 # Need to create a builder object to evaluate expressions.
662 builder = make_builder(builder_job_order,
667 # Now update job_order with secondaryFiles
668 discover_secondary_files(arvrunner.fs_access,
673 jobmapper = upload_dependencies(arvrunner,
677 job_order.get("id", "#"),
681 if "id" in job_order:
684 # Need to filter this out, gets added by cwltool when providing
685 # parameters on the command line.
686 if "job_order" in job_order:
687 del job_order["job_order"]
691 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
693 def upload_workflow_deps(arvrunner, tool, runtimeContext):
694 # Ensure that Docker images needed by this workflow are available
696 with Perf(metrics, "upload_docker"):
697 upload_docker(arvrunner, tool, runtimeContext)
699 document_loader = tool.doc_loader
703 def upload_tool_deps(deptool):
705 discovered_secondaryfiles = {}
706 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
707 pm = upload_dependencies(arvrunner,
708 "%s dependencies" % (shortname(deptool["id"])),
714 include_primary=False,
715 discovered_secondaryfiles=discovered_secondaryfiles,
716 cache=tool_dep_cache)
717 document_loader.idx[deptool["id"]] = deptool
719 for k,v in pm.items():
720 toolmap[k] = v.resolved
721 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
723 tool.visit(upload_tool_deps)
727 def arvados_jobs_image(arvrunner, img, runtimeContext):
728 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
731 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
733 runtimeContext.project_uuid,
734 runtimeContext.force_docker_pull,
735 runtimeContext.tmp_outdir_prefix,
736 runtimeContext.match_local_docker,
737 runtimeContext.copy_deps)
738 except Exception as e:
739 raise Exception("Docker image %s is not available\n%s" % (img, e) )
742 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
743 collection = arvados.collection.Collection(api_client=arvrunner.api,
744 keep_client=arvrunner.keep_client,
745 num_retries=arvrunner.num_retries)
746 with collection.open("workflow.cwl", "w") as f:
747 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
749 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
750 ["name", "like", name+"%"]]
751 if runtimeContext.project_uuid:
752 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
753 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
756 logger.info("Using collection %s", exists["items"][0]["uuid"])
758 collection.save_new(name=name,
759 owner_uuid=runtimeContext.project_uuid,
760 ensure_unique_name=True,
761 num_retries=arvrunner.num_retries)
762 logger.info("Uploaded to %s", collection.manifest_locator())
764 return collection.portable_data_hash()
767 class Runner(Process):
768 """Base class for runner processes, which submit an instance of
769 arvados-cwl-runner and wait for the final result."""
771 def __init__(self, runner, updated_tool,
772 tool, loadingContext, enable_reuse,
773 output_name, output_tags, submit_runner_ram=0,
774 name=None, on_error=None, submit_runner_image=None,
775 intermediate_output_ttl=0, merged_map=None,
776 priority=None, secret_store=None,
777 collection_cache_size=256,
778 collection_cache_is_default=True):
780 loadingContext = loadingContext.copy()
781 loadingContext.metadata = updated_tool.metadata.copy()
783 super(Runner, self).__init__(updated_tool.tool, loadingContext)
785 self.arvrunner = runner
786 self.embedded_tool = tool
787 self.job_order = None
790 # If reuse is permitted by command line arguments but
791 # disabled by the workflow itself, disable it.
792 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
794 enable_reuse = reuse_req["enableReuse"]
795 self.enable_reuse = enable_reuse
797 self.final_output = None
798 self.output_name = output_name
799 self.output_tags = output_tags
801 self.on_error = on_error
802 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
803 self.intermediate_output_ttl = intermediate_output_ttl
804 self.priority = priority
805 self.secret_store = secret_store
806 self.enable_dev = loadingContext.enable_dev
808 self.submit_runner_cores = 1
809 self.submit_runner_ram = 1024 # defaut 1 GiB
810 self.collection_cache_size = collection_cache_size
812 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
813 if runner_resource_req:
814 if runner_resource_req.get("coresMin"):
815 self.submit_runner_cores = runner_resource_req["coresMin"]
816 if runner_resource_req.get("ramMin"):
817 self.submit_runner_ram = runner_resource_req["ramMin"]
818 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
819 self.collection_cache_size = runner_resource_req["keep_cache"]
821 if submit_runner_ram:
822 # Command line / initializer overrides default and/or spec from workflow
823 self.submit_runner_ram = submit_runner_ram
825 if self.submit_runner_ram <= 0:
826 raise Exception("Value of submit-runner-ram must be greater than zero")
828 if self.submit_runner_cores <= 0:
829 raise Exception("Value of submit-runner-cores must be greater than zero")
831 self.merged_map = merged_map or {}
834 job_order, # type: Mapping[Text, Text]
835 output_callbacks, # type: Callable[[Any, Any], Any]
836 runtimeContext # type: RuntimeContext
837 ): # type: (...) -> Generator[Any, None, None]
838 self.job_order = job_order
839 self._init_job(job_order, runtimeContext)
842 def update_pipeline_component(self, record):
845 def done(self, record):
846 """Base method for handling a completed runner."""
849 if record["state"] == "Complete":
850 if record.get("exit_code") is not None:
851 if record["exit_code"] == 33:
852 processStatus = "UnsupportedRequirement"
853 elif record["exit_code"] == 0:
854 processStatus = "success"
856 processStatus = "permanentFail"
858 processStatus = "success"
860 processStatus = "permanentFail"
864 if processStatus == "permanentFail":
865 logc = arvados.collection.CollectionReader(record["log"],
866 api_client=self.arvrunner.api,
867 keep_client=self.arvrunner.keep_client,
868 num_retries=self.arvrunner.num_retries)
869 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
871 self.final_output = record["output"]
872 outc = arvados.collection.CollectionReader(self.final_output,
873 api_client=self.arvrunner.api,
874 keep_client=self.arvrunner.keep_client,
875 num_retries=self.arvrunner.num_retries)
876 if "cwl.output.json" in outc:
877 with outc.open("cwl.output.json", "rb") as f:
879 outputs = json.loads(f.read().decode())
880 def keepify(fileobj):
881 path = fileobj["location"]
882 if not path.startswith("keep:"):
883 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
884 adjustFileObjs(outputs, keepify)
885 adjustDirObjs(outputs, keepify)
887 logger.exception("[%s] While getting final output object", self.name)
888 self.arvrunner.output_callback({}, "permanentFail")
890 self.arvrunner.output_callback(outputs, processStatus)
895 # --- from cwltool ---
898 CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
903 doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
906 loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
907 urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
908 nestdirs: bool = True,
909 do_normalize: bool = True,
910 ) -> Optional[MutableSequence[CWLObjectType]]:
912 """Given a CWL document or input object, search for dependencies
913 (references to external files) of 'doc' and return them as a list
914 of File or Directory objects.
916 The 'base' is the base URL for relative references.
918 Looks for objects with 'class: File' or 'class: Directory' and
919 adds them to the list of dependencies.
921 Anything in 'urlfields' is also added as a File dependency.
923 Anything in 'reffields' (such as workflow step 'run') will be
924 added as a dependency and also loaded (using the 'loadref'
925 function) and recursively scanned for dependencies. Those
926 dependencies will be added as secondary files to the primary file.
928 If "nestdirs" is true, create intermediate directory objects when
929 a file is located in a subdirectory under the starting directory.
930 This is so that if the dependencies are materialized, they will
931 produce the same relative file system locations.
939 r: Optional[MutableSequence[CWLObjectType]] = None
940 if isinstance(doc, MutableMapping):
942 if cast(str, doc["id"]).startswith("file://"):
943 df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
947 r.append({"class": "File", "location": df, "format": CWL_IANA})
950 if doc.get("class") in ("File", "Directory") and "location" in urlfields:
951 with Perf(metrics, "File or Directory with location"):
952 u = cast(Optional[str], doc.get("location", doc.get("path")))
953 if u and not u.startswith("_:"):
955 "class": doc["class"],
956 "location": urljoin(base, u),
957 } # type: CWLObjectType
958 if "basename" in doc:
959 deps["basename"] = doc["basename"]
960 if doc["class"] == "Directory" and "listing" in doc:
961 deps["listing"] = doc["listing"]
962 if doc["class"] == "File" and "secondaryFiles" in doc:
966 Union[CWLObjectType, MutableSequence[CWLObjectType]],
967 doc["secondaryFiles"],
977 deps["secondaryFiles"] = cast(
982 deps = nestdir(base, deps)
987 if doc["class"] == "Directory" and "listing" in doc:
990 cast(MutableSequence[CWLObjectType], doc["listing"]),
1002 elif doc["class"] == "File" and "secondaryFiles" in doc:
1005 cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
1019 for k, v in doc.items():
1021 with Perf(metrics, "k in reffields"):
1022 for u2 in aslist(v):
1023 if isinstance(u2, MutableMapping):
1040 subid = urljoin(base, u2)
1041 basedf, _ = urllib.parse.urldefrag(base)
1042 subiddf, _ = urllib.parse.urldefrag(subid)
1043 if basedf == subiddf:
1046 Union[MutableSequence[CWLObjectType], CWLObjectType],
1053 } # type: CWLObjectType
1065 deps2["secondaryFiles"] = cast(
1066 MutableSequence[CWLOutputAtomType], mergedirs(sf)
1069 deps2 = nestdir(base, deps2)
1073 elif k in urlfields and k != "location":
1074 with Perf(metrics, "k in urlfields"):
1075 for u3 in aslist(v):
1076 deps = {"class": "File", "location": urljoin(base, u3)}
1078 deps = nestdir(base, deps)
1082 elif doc.get("class") in ("File", "Directory") and k in (
1086 # should be handled earlier.
1089 with Perf(metrics, "k is something else"):
1092 cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
1105 elif isinstance(doc, MutableSequence):
1106 with Perf(metrics, "d in doc"):
1123 if r and do_normalize:
1124 normalizeFilesDirs(r)