1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion")
111 def search_schemadef(name, reqs):
113 if r["class"] == "SchemaDefRequirement":
114 for sd in r["types"]:
115 if sd["name"] == name:
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120 "float", "double", "string", "record",
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125 # union type, collect all possible secondaryFiles
126 for i in inputschema:
127 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130 if isinstance(inputschema, basestring):
131 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
137 if "secondaryFiles" in inputschema:
138 # set secondaryFiles, may be inherited by compound types.
139 secondaryspec = inputschema["secondaryFiles"]
141 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142 not isinstance(inputschema["type"], basestring)):
143 # compound type (union, array, record)
144 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146 elif (inputschema["type"] == "record" and
147 isinstance(primary, Mapping)):
149 # record type, find secondary files associated with fields.
151 for f in inputschema["fields"]:
152 p = primary.get(shortname(f["name"]))
154 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156 elif (inputschema["type"] == "array" and
157 isinstance(primary, Sequence)):
159 # array type, find secondary files of elements
162 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164 elif (inputschema["type"] == "File" and
166 isinstance(primary, Mapping) and
167 primary.get("class") == "File" and
168 "secondaryFiles" not in primary):
170 # Found a file, check for secondaryFiles
173 primary["secondaryFiles"] = secondaryspec
174 for i, sf in enumerate(aslist(secondaryspec)):
175 if builder.cwlVersion == "v1.0":
176 pattern = builder.do_eval(sf, context=primary)
178 pattern = builder.do_eval(sf["pattern"], context=primary)
181 if isinstance(pattern, list):
182 specs.extend(pattern)
183 elif isinstance(pattern, dict):
184 specs.append(pattern)
185 elif isinstance(pattern, str):
186 specs.append({"pattern": pattern})
188 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
189 "Expression must return list, object, string or null")
192 for i, sf in enumerate(specs):
193 if isinstance(sf, dict):
194 if sf.get("class") == "File":
195 pattern = sf["basename"]
197 pattern = sf["pattern"]
198 required = sf.get("required")
199 elif isinstance(sf, str):
203 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
204 "Expression must return list, object, string or null")
206 sfpath = substitute(primary["location"], pattern)
207 required = builder.do_eval(required, context=primary)
209 if fsaccess.exists(sfpath):
210 found.append({"location": sfpath, "class": "File"})
212 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213 "Required secondary file '%s' does not exist" % sfpath)
215 primary["secondaryFiles"] = cmap(found)
216 if discovered is not None:
217 discovered[primary["location"]] = primary["secondaryFiles"]
218 elif inputschema["type"] not in primitive_types_set:
219 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
221 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
222 for inputschema in inputs:
223 primary = job_order.get(shortname(inputschema["id"]))
224 if isinstance(primary, (Mapping, Sequence)):
225 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
227 def upload_dependencies(arvrunner, name, document_loader,
228 workflowobj, uri, loadref_run,
229 include_primary=True, discovered_secondaryfiles=None):
230 """Upload the dependencies of the workflowobj document to Keep.
232 Returns a pathmapper object mapping local paths to keep references. Also
233 does an in-place update of references in "workflowobj".
235 Use scandeps to find $import, $include, $schemas, run, File and Directory
236 fields that represent external references.
238 If workflowobj has an "id" field, this will reload the document to ensure
239 it is scanning the raw document prior to preprocessing.
244 joined = document_loader.fetcher.urljoin(b, u)
245 defrg, _ = urllib.parse.urldefrag(joined)
246 if defrg not in loaded:
248 # Use fetch_text to get raw file (before preprocessing).
249 text = document_loader.fetch_text(defrg)
250 if isinstance(text, bytes):
251 textIO = StringIO(text.decode('utf-8'))
253 textIO = StringIO(text)
254 return yaml.safe_load(textIO)
259 loadref_fields = set(("$import", "run"))
261 loadref_fields = set(("$import",))
263 scanobj = workflowobj
264 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
265 # Need raw file content (before preprocessing) to ensure
266 # that external references in $include and $mixin are captured.
267 scanobj = loadref("", workflowobj["id"])
271 sc_result = scandeps(uri, scanobj,
273 set(("$include", "$schemas", "location")),
274 loadref, urljoin=document_loader.fetcher.urljoin)
279 def collect_uuids(obj):
280 loc = obj.get("location", "")
283 # Collect collection uuids that need to be resolved to
284 # portable data hashes
285 gp = collection_uuid_pattern.match(loc)
287 uuids[gp.groups()[0]] = obj
288 if collectionUUID in obj:
289 uuids[obj[collectionUUID]] = obj
291 def collect_uploads(obj):
292 loc = obj.get("location", "")
296 if sp[0] in ("file", "http", "https"):
297 # Record local files than need to be uploaded,
298 # don't include file literals, keep references, etc.
302 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
303 visit_class(sc_result, ("File", "Directory"), collect_uploads)
305 # Resolve any collection uuids we found to portable data hashes
306 # and assign them to uuid_map
308 fetch_uuids = list(uuids.keys())
310 # For a large number of fetch_uuids, API server may limit
311 # response size, so keep fetching from API server has nothing
313 lookups = arvrunner.api.collections().list(
314 filters=[["uuid", "in", fetch_uuids]],
316 select=["uuid", "portable_data_hash"]).execute(
317 num_retries=arvrunner.num_retries)
319 if not lookups["items"]:
322 for l in lookups["items"]:
323 uuid_map[l["uuid"]] = l["portable_data_hash"]
325 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
327 normalizeFilesDirs(sc)
329 if include_primary and "id" in workflowobj:
330 sc.append({"class": "File", "location": workflowobj["id"]})
332 if "$schemas" in workflowobj:
333 for s in workflowobj["$schemas"]:
334 sc.append({"class": "File", "location": s})
336 def visit_default(obj):
338 def ensure_default_location(f):
339 if "location" not in f and "path" in f:
340 f["location"] = f["path"]
342 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
343 # Doesn't exist, remove from list of dependencies to upload
344 sc[:] = [x for x in sc if x["location"] != f["location"]]
345 # Delete "default" from workflowobj
347 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
351 find_defaults(workflowobj, visit_default)
354 def discover_default_secondary_files(obj):
355 builder_job_order = {}
356 for t in obj["inputs"]:
357 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
358 # Need to create a builder object to evaluate expressions.
359 builder = make_builder(builder_job_order,
360 obj.get("hints", []),
361 obj.get("requirements", []),
364 discover_secondary_files(arvrunner.fs_access,
370 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
371 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
373 for d in list(discovered):
374 # Only interested in discovered secondaryFiles which are local
375 # files that need to be uploaded.
376 if d.startswith("file:"):
377 sc.extend(discovered[d])
381 mapper = ArvPathMapper(arvrunner, sc, "",
385 single_collection=True)
388 loc = p.get("location")
389 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
390 p["location"] = mapper.mapper(p["location"]).resolved
396 if collectionUUID in p:
397 uuid = p[collectionUUID]
398 if uuid not in uuid_map:
399 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
400 "Collection uuid %s not found" % uuid)
401 gp = collection_pdh_pattern.match(loc)
402 if gp and uuid_map[uuid] != gp.groups()[0]:
403 # This file entry has both collectionUUID and a PDH
404 # location. If the PDH doesn't match the one returned
405 # the API server, raise an error.
406 raise SourceLine(p, "location", validate.ValidationException).makeError(
407 "Expected collection uuid %s to be %s but API server reported %s" % (
408 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
410 gp = collection_uuid_pattern.match(loc)
413 uuid = gp.groups()[0]
414 if uuid not in uuid_map:
415 raise SourceLine(p, "location", validate.ValidationException).makeError(
416 "Collection uuid %s not found" % uuid)
417 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
418 p[collectionUUID] = uuid
420 visit_class(workflowobj, ("File", "Directory"), setloc)
421 visit_class(discovered, ("File", "Directory"), setloc)
423 if discovered_secondaryfiles is not None:
425 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
427 if "$schemas" in workflowobj:
429 for s in workflowobj["$schemas"]:
430 sch.append(mapper.mapper(s).resolved)
431 workflowobj["$schemas"] = sch
436 def upload_docker(arvrunner, tool):
437 """Uploads Docker images used in CommandLineTool objects."""
439 if isinstance(tool, CommandLineTool):
440 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
442 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
443 # TODO: can be supported by containers API, but not jobs API.
444 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
445 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
446 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
447 arvrunner.runtimeContext.force_docker_pull,
448 arvrunner.runtimeContext.tmp_outdir_prefix)
450 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
451 True, arvrunner.project_uuid,
452 arvrunner.runtimeContext.force_docker_pull,
453 arvrunner.runtimeContext.tmp_outdir_prefix)
454 elif isinstance(tool, cwltool.workflow.Workflow):
456 upload_docker(arvrunner, s.embedded_tool)
459 def packed_workflow(arvrunner, tool, merged_map):
460 """Create a packed workflow.
462 A "packed" workflow is one where all the components have been combined into a single document."""
465 packed = pack(arvrunner.loadingContext, tool.tool["id"],
466 rewrite_out=rewrites,
467 loader=tool.doc_loader)
469 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
471 def visit(v, cur_id):
472 if isinstance(v, dict):
473 if v.get("class") in ("CommandLineTool", "Workflow"):
474 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
475 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
477 cur_id = rewrite_to_orig.get(v["id"], v["id"])
478 if "path" in v and "location" not in v:
479 v["location"] = v["path"]
481 if "location" in v and not v["location"].startswith("keep:"):
482 v["location"] = merged_map[cur_id].resolved[v["location"]]
483 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
484 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
485 if v.get("class") == "DockerRequirement":
486 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
487 arvrunner.project_uuid,
488 arvrunner.runtimeContext.force_docker_pull,
489 arvrunner.runtimeContext.tmp_outdir_prefix)
492 if isinstance(v, list):
499 def tag_git_version(packed):
500 if tool.tool["id"].startswith("file://"):
501 path = os.path.dirname(tool.tool["id"][7:])
503 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
504 except (OSError, subprocess.CalledProcessError):
507 packed["http://schema.org/version"] = githash
510 def upload_job_order(arvrunner, name, tool, job_order):
511 """Upload local files referenced in the input object and return updated input
512 object with 'location' updated to the proper keep references.
515 # Make a copy of the job order and set defaults.
516 builder_job_order = copy.copy(job_order)
518 # fill_in_defaults throws an error if there are any
519 # missing required parameters, we don't want it to do that
520 # so make them all optional.
521 inputs_copy = copy.deepcopy(tool.tool["inputs"])
522 for i in inputs_copy:
523 if "null" not in i["type"]:
524 i["type"] = ["null"] + aslist(i["type"])
526 fill_in_defaults(inputs_copy,
529 # Need to create a builder object to evaluate expressions.
530 builder = make_builder(builder_job_order,
535 # Now update job_order with secondaryFiles
536 discover_secondary_files(arvrunner.fs_access,
541 jobmapper = upload_dependencies(arvrunner,
545 job_order.get("id", "#"),
548 if "id" in job_order:
551 # Need to filter this out, gets added by cwltool when providing
552 # parameters on the command line.
553 if "job_order" in job_order:
554 del job_order["job_order"]
558 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
560 def upload_workflow_deps(arvrunner, tool):
561 # Ensure that Docker images needed by this workflow are available
563 upload_docker(arvrunner, tool)
565 document_loader = tool.doc_loader
569 def upload_tool_deps(deptool):
571 discovered_secondaryfiles = {}
572 pm = upload_dependencies(arvrunner,
573 "%s dependencies" % (shortname(deptool["id"])),
578 include_primary=False,
579 discovered_secondaryfiles=discovered_secondaryfiles)
580 document_loader.idx[deptool["id"]] = deptool
582 for k,v in pm.items():
583 toolmap[k] = v.resolved
584 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
586 tool.visit(upload_tool_deps)
590 def arvados_jobs_image(arvrunner, img):
591 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
594 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
595 arvrunner.runtimeContext.force_docker_pull,
596 arvrunner.runtimeContext.tmp_outdir_prefix)
597 except Exception as e:
598 raise Exception("Docker image %s is not available\n%s" % (img, e) )
601 def upload_workflow_collection(arvrunner, name, packed):
602 collection = arvados.collection.Collection(api_client=arvrunner.api,
603 keep_client=arvrunner.keep_client,
604 num_retries=arvrunner.num_retries)
605 with collection.open("workflow.cwl", "w") as f:
606 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
608 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
609 ["name", "like", name+"%"]]
610 if arvrunner.project_uuid:
611 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
612 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
615 logger.info("Using collection %s", exists["items"][0]["uuid"])
617 collection.save_new(name=name,
618 owner_uuid=arvrunner.project_uuid,
619 ensure_unique_name=True,
620 num_retries=arvrunner.num_retries)
621 logger.info("Uploaded to %s", collection.manifest_locator())
623 return collection.portable_data_hash()
626 class Runner(Process):
627 """Base class for runner processes, which submit an instance of
628 arvados-cwl-runner and wait for the final result."""
630 def __init__(self, runner, updated_tool,
631 tool, loadingContext, enable_reuse,
632 output_name, output_tags, submit_runner_ram=0,
633 name=None, on_error=None, submit_runner_image=None,
634 intermediate_output_ttl=0, merged_map=None,
635 priority=None, secret_store=None,
636 collection_cache_size=256,
637 collection_cache_is_default=True):
639 loadingContext = loadingContext.copy()
640 loadingContext.metadata = updated_tool.metadata.copy()
642 super(Runner, self).__init__(updated_tool.tool, loadingContext)
644 self.arvrunner = runner
645 self.embedded_tool = tool
646 self.job_order = None
649 # If reuse is permitted by command line arguments but
650 # disabled by the workflow itself, disable it.
651 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
653 enable_reuse = reuse_req["enableReuse"]
654 self.enable_reuse = enable_reuse
656 self.final_output = None
657 self.output_name = output_name
658 self.output_tags = output_tags
660 self.on_error = on_error
661 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
662 self.intermediate_output_ttl = intermediate_output_ttl
663 self.priority = priority
664 self.secret_store = secret_store
665 self.enable_dev = loadingContext.enable_dev
667 self.submit_runner_cores = 1
668 self.submit_runner_ram = 1024 # defaut 1 GiB
669 self.collection_cache_size = collection_cache_size
671 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
672 if runner_resource_req:
673 if runner_resource_req.get("coresMin"):
674 self.submit_runner_cores = runner_resource_req["coresMin"]
675 if runner_resource_req.get("ramMin"):
676 self.submit_runner_ram = runner_resource_req["ramMin"]
677 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
678 self.collection_cache_size = runner_resource_req["keep_cache"]
680 if submit_runner_ram:
681 # Command line / initializer overrides default and/or spec from workflow
682 self.submit_runner_ram = submit_runner_ram
684 if self.submit_runner_ram <= 0:
685 raise Exception("Value of submit-runner-ram must be greater than zero")
687 if self.submit_runner_cores <= 0:
688 raise Exception("Value of submit-runner-cores must be greater than zero")
690 self.merged_map = merged_map or {}
693 job_order, # type: Mapping[Text, Text]
694 output_callbacks, # type: Callable[[Any, Any], Any]
695 runtimeContext # type: RuntimeContext
696 ): # type: (...) -> Generator[Any, None, None]
697 self.job_order = job_order
698 self._init_job(job_order, runtimeContext)
701 def update_pipeline_component(self, record):
704 def done(self, record):
705 """Base method for handling a completed runner."""
708 if record["state"] == "Complete":
709 if record.get("exit_code") is not None:
710 if record["exit_code"] == 33:
711 processStatus = "UnsupportedRequirement"
712 elif record["exit_code"] == 0:
713 processStatus = "success"
715 processStatus = "permanentFail"
717 processStatus = "success"
719 processStatus = "permanentFail"
723 if processStatus == "permanentFail":
724 logc = arvados.collection.CollectionReader(record["log"],
725 api_client=self.arvrunner.api,
726 keep_client=self.arvrunner.keep_client,
727 num_retries=self.arvrunner.num_retries)
728 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
730 self.final_output = record["output"]
731 outc = arvados.collection.CollectionReader(self.final_output,
732 api_client=self.arvrunner.api,
733 keep_client=self.arvrunner.keep_client,
734 num_retries=self.arvrunner.num_retries)
735 if "cwl.output.json" in outc:
736 with outc.open("cwl.output.json", "rb") as f:
738 outputs = json.loads(f.read().decode())
739 def keepify(fileobj):
740 path = fileobj["location"]
741 if not path.startswith("keep:"):
742 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
743 adjustFileObjs(outputs, keepify)
744 adjustDirObjs(outputs, keepify)
746 logger.exception("[%s] While getting final output object", self.name)
747 self.arvrunner.output_callback({}, "permanentFail")
749 self.arvrunner.output_callback(outputs, processStatus)