1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
53 logger = logging.getLogger('arvados.cwl-runner')
55 def trim_anonymous_location(obj):
56 """Remove 'location' field from File and Directory literals.
58 To make internal handling easier, literals are assigned a random id for
59 'location'. However, when writing the record back out, this can break
60 reproducibility. Since it is valid for literals not have a 'location'
65 if obj.get("location", "").startswith("_:"):
69 def remove_redundant_fields(obj):
70 for field in ("path", "nameext", "nameroot", "dirname"):
75 def find_defaults(d, op):
76 if isinstance(d, list):
79 elif isinstance(d, dict):
83 for i in viewvalues(d):
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
89 files=[], # type: List[Dict[Text, Text]]
90 bindings=[], # type: List[Dict[Text, Any]]
91 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
92 names=None, # type: Names
93 requirements=requirements, # type: List[Dict[Text, Any]]
94 hints=hints, # type: List[Dict[Text, Any]]
95 resources={}, # type: Dict[str, int]
96 mutation_manager=None, # type: Optional[MutationManager]
97 formatgraph=None, # type: Optional[Graph]
98 make_fs_access=None, # type: Type[StdFsAccess]
99 fs_access=None, # type: StdFsAccess
100 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101 timeout=runtimeContext.eval_timeout, # type: float
102 debug=runtimeContext.debug, # type: bool
103 js_console=runtimeContext.js_console, # type: bool
104 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
105 loadListing="", # type: Text
106 outdir="", # type: Text
107 tmpdir="", # type: Text
108 stagedir="", # type: Text
109 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion")
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if isinstance(inputschema, basestring):
132 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
138 if "secondaryFiles" in inputschema:
139 # set secondaryFiles, may be inherited by compound types.
140 secondaryspec = inputschema["secondaryFiles"]
142 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143 not isinstance(inputschema["type"], basestring)):
144 # compound type (union, array, record)
145 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147 elif (inputschema["type"] == "record" and
148 isinstance(primary, Mapping)):
150 # record type, find secondary files associated with fields.
152 for f in inputschema["fields"]:
153 p = primary.get(shortname(f["name"]))
155 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157 elif (inputschema["type"] == "array" and
158 isinstance(primary, Sequence)):
160 # array type, find secondary files of elements
163 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165 elif (inputschema["type"] == "File" and
167 isinstance(primary, Mapping) and
168 primary.get("class") == "File" and
169 "secondaryFiles" not in primary):
171 # Found a file, check for secondaryFiles
174 primary["secondaryFiles"] = secondaryspec
175 for i, sf in enumerate(aslist(secondaryspec)):
176 pattern = builder.do_eval(sf["pattern"], context=primary)
179 if isinstance(pattern, list):
180 specs.extend(pattern)
181 elif isinstance(pattern, dict):
182 specs.append(pattern)
183 elif isinstance(pattern, str):
184 specs.append({"pattern": pattern})
186 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
187 "Expression must return list, object, string or null")
190 for i, sf in enumerate(specs):
191 if isinstance(sf, dict):
192 if sf.get("class") == "File":
193 pattern = sf["basename"]
195 pattern = sf["pattern"]
196 required = sf.get("required")
197 elif isinstance(sf, str):
201 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202 "Expression must return list, object, string or null")
204 sfpath = substitute(primary["location"], pattern)
205 required = builder.do_eval(required, context=primary)
207 if fsaccess.exists(sfpath):
208 found.append({"location": sfpath, "class": "File"})
210 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
211 "Required secondary file '%s' does not exist" % sfpath)
213 primary["secondaryFiles"] = cmap(found)
214 if discovered is not None:
215 discovered[primary["location"]] = primary["secondaryFiles"]
216 elif inputschema["type"] not in primitive_types_set:
217 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
219 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
220 for inputschema in inputs:
221 primary = job_order.get(shortname(inputschema["id"]))
222 if isinstance(primary, (Mapping, Sequence)):
223 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
225 def upload_dependencies(arvrunner, name, document_loader,
226 workflowobj, uri, loadref_run,
227 include_primary=True, discovered_secondaryfiles=None):
228 """Upload the dependencies of the workflowobj document to Keep.
230 Returns a pathmapper object mapping local paths to keep references. Also
231 does an in-place update of references in "workflowobj".
233 Use scandeps to find $import, $include, $schemas, run, File and Directory
234 fields that represent external references.
236 If workflowobj has an "id" field, this will reload the document to ensure
237 it is scanning the raw document prior to preprocessing.
242 joined = document_loader.fetcher.urljoin(b, u)
243 defrg, _ = urllib.parse.urldefrag(joined)
244 if defrg not in loaded:
246 # Use fetch_text to get raw file (before preprocessing).
247 text = document_loader.fetch_text(defrg)
248 if isinstance(text, bytes):
249 textIO = StringIO(text.decode('utf-8'))
251 textIO = StringIO(text)
252 return yaml.safe_load(textIO)
257 loadref_fields = set(("$import", "run"))
259 loadref_fields = set(("$import",))
261 scanobj = workflowobj
262 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
263 # Need raw file content (before preprocessing) to ensure
264 # that external references in $include and $mixin are captured.
265 scanobj = loadref("", workflowobj["id"])
267 sc_result = scandeps(uri, scanobj,
269 set(("$include", "$schemas", "location")),
270 loadref, urljoin=document_loader.fetcher.urljoin)
275 def collect_uuids(obj):
276 loc = obj.get("location", "")
279 # Collect collection uuids that need to be resolved to
280 # portable data hashes
281 gp = collection_uuid_pattern.match(loc)
283 uuids[gp.groups()[0]] = obj
284 if collectionUUID in obj:
285 uuids[obj[collectionUUID]] = obj
287 def collect_uploads(obj):
288 loc = obj.get("location", "")
292 if sp[0] in ("file", "http", "https"):
293 # Record local files than need to be uploaded,
294 # don't include file literals, keep references, etc.
298 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
299 visit_class(sc_result, ("File", "Directory"), collect_uploads)
301 # Resolve any collection uuids we found to portable data hashes
302 # and assign them to uuid_map
304 fetch_uuids = list(uuids.keys())
306 # For a large number of fetch_uuids, API server may limit
307 # response size, so keep fetching from API server has nothing
309 lookups = arvrunner.api.collections().list(
310 filters=[["uuid", "in", fetch_uuids]],
312 select=["uuid", "portable_data_hash"]).execute(
313 num_retries=arvrunner.num_retries)
315 if not lookups["items"]:
318 for l in lookups["items"]:
319 uuid_map[l["uuid"]] = l["portable_data_hash"]
321 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
323 normalizeFilesDirs(sc)
325 if include_primary and "id" in workflowobj:
326 sc.append({"class": "File", "location": workflowobj["id"]})
328 if "$schemas" in workflowobj:
329 for s in workflowobj["$schemas"]:
330 sc.append({"class": "File", "location": s})
332 def visit_default(obj):
334 def ensure_default_location(f):
335 if "location" not in f and "path" in f:
336 f["location"] = f["path"]
338 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
339 # Doesn't exist, remove from list of dependencies to upload
340 sc[:] = [x for x in sc if x["location"] != f["location"]]
341 # Delete "default" from workflowobj
343 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
347 find_defaults(workflowobj, visit_default)
350 def discover_default_secondary_files(obj):
351 builder_job_order = {}
352 for t in obj["inputs"]:
353 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
354 # Need to create a builder object to evaluate expressions.
355 builder = make_builder(builder_job_order,
356 obj.get("hints", []),
357 obj.get("requirements", []),
360 discover_secondary_files(arvrunner.fs_access,
366 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
367 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
369 for d in list(discovered):
370 # Only interested in discovered secondaryFiles which are local
371 # files that need to be uploaded.
372 if d.startswith("file:"):
373 sc.extend(discovered[d])
377 mapper = ArvPathMapper(arvrunner, sc, "",
381 single_collection=True)
384 loc = p.get("location")
385 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
386 p["location"] = mapper.mapper(p["location"]).resolved
392 if collectionUUID in p:
393 uuid = p[collectionUUID]
394 if uuid not in uuid_map:
395 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
396 "Collection uuid %s not found" % uuid)
397 gp = collection_pdh_pattern.match(loc)
398 if gp and uuid_map[uuid] != gp.groups()[0]:
399 # This file entry has both collectionUUID and a PDH
400 # location. If the PDH doesn't match the one returned
401 # the API server, raise an error.
402 raise SourceLine(p, "location", validate.ValidationException).makeError(
403 "Expected collection uuid %s to be %s but API server reported %s" % (
404 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
406 gp = collection_uuid_pattern.match(loc)
409 uuid = gp.groups()[0]
410 if uuid not in uuid_map:
411 raise SourceLine(p, "location", validate.ValidationException).makeError(
412 "Collection uuid %s not found" % uuid)
413 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
414 p[collectionUUID] = uuid
416 visit_class(workflowobj, ("File", "Directory"), setloc)
417 visit_class(discovered, ("File", "Directory"), setloc)
419 if discovered_secondaryfiles is not None:
421 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
423 if "$schemas" in workflowobj:
425 for s in workflowobj["$schemas"]:
426 sch.append(mapper.mapper(s).resolved)
427 workflowobj["$schemas"] = sch
432 def upload_docker(arvrunner, tool):
433 """Uploads Docker images used in CommandLineTool objects."""
435 if isinstance(tool, CommandLineTool):
436 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
438 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
439 # TODO: can be supported by containers API, but not jobs API.
440 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
441 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
442 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
444 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
445 elif isinstance(tool, cwltool.workflow.Workflow):
447 upload_docker(arvrunner, s.embedded_tool)
450 def packed_workflow(arvrunner, tool, merged_map):
451 """Create a packed workflow.
453 A "packed" workflow is one where all the components have been combined into a single document."""
456 packed = pack(arvrunner.loadingContext, tool.tool["id"],
457 rewrite_out=rewrites,
458 loader=tool.doc_loader)
460 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
462 def visit(v, cur_id):
463 if isinstance(v, dict):
464 if v.get("class") in ("CommandLineTool", "Workflow"):
465 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
466 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
468 cur_id = rewrite_to_orig.get(v["id"], v["id"])
469 if "path" in v and "location" not in v:
470 v["location"] = v["path"]
472 if "location" in v and not v["location"].startswith("keep:"):
473 v["location"] = merged_map[cur_id].resolved[v["location"]]
474 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
475 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
476 if v.get("class") == "DockerRequirement":
477 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
480 if isinstance(v, list):
487 def tag_git_version(packed):
488 if tool.tool["id"].startswith("file://"):
489 path = os.path.dirname(tool.tool["id"][7:])
491 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
492 except (OSError, subprocess.CalledProcessError):
495 packed["http://schema.org/version"] = githash
498 def upload_job_order(arvrunner, name, tool, job_order):
499 """Upload local files referenced in the input object and return updated input
500 object with 'location' updated to the proper keep references.
503 # Make a copy of the job order and set defaults.
504 builder_job_order = copy.copy(job_order)
506 # fill_in_defaults throws an error if there are any
507 # missing required parameters, we don't want it to do that
508 # so make them all optional.
509 inputs_copy = copy.deepcopy(tool.tool["inputs"])
510 for i in inputs_copy:
511 if "null" not in i["type"]:
512 i["type"] = ["null"] + aslist(i["type"])
514 fill_in_defaults(inputs_copy,
517 # Need to create a builder object to evaluate expressions.
518 builder = make_builder(builder_job_order,
523 # Now update job_order with secondaryFiles
524 discover_secondary_files(arvrunner.fs_access,
529 jobmapper = upload_dependencies(arvrunner,
533 job_order.get("id", "#"),
536 if "id" in job_order:
539 # Need to filter this out, gets added by cwltool when providing
540 # parameters on the command line.
541 if "job_order" in job_order:
542 del job_order["job_order"]
546 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
548 def upload_workflow_deps(arvrunner, tool):
549 # Ensure that Docker images needed by this workflow are available
551 upload_docker(arvrunner, tool)
553 document_loader = tool.doc_loader
557 def upload_tool_deps(deptool):
559 discovered_secondaryfiles = {}
560 pm = upload_dependencies(arvrunner,
561 "%s dependencies" % (shortname(deptool["id"])),
566 include_primary=False,
567 discovered_secondaryfiles=discovered_secondaryfiles)
568 document_loader.idx[deptool["id"]] = deptool
570 for k,v in pm.items():
571 toolmap[k] = v.resolved
572 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
574 tool.visit(upload_tool_deps)
578 def arvados_jobs_image(arvrunner, img):
579 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
582 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
583 except Exception as e:
584 raise Exception("Docker image %s is not available\n%s" % (img, e) )
587 def upload_workflow_collection(arvrunner, name, packed):
588 collection = arvados.collection.Collection(api_client=arvrunner.api,
589 keep_client=arvrunner.keep_client,
590 num_retries=arvrunner.num_retries)
591 with collection.open("workflow.cwl", "w") as f:
592 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
594 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
595 ["name", "like", name+"%"]]
596 if arvrunner.project_uuid:
597 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
598 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
601 logger.info("Using collection %s", exists["items"][0]["uuid"])
603 collection.save_new(name=name,
604 owner_uuid=arvrunner.project_uuid,
605 ensure_unique_name=True,
606 num_retries=arvrunner.num_retries)
607 logger.info("Uploaded to %s", collection.manifest_locator())
609 return collection.portable_data_hash()
612 class Runner(Process):
613 """Base class for runner processes, which submit an instance of
614 arvados-cwl-runner and wait for the final result."""
616 def __init__(self, runner, updated_tool,
617 tool, loadingContext, enable_reuse,
618 output_name, output_tags, submit_runner_ram=0,
619 name=None, on_error=None, submit_runner_image=None,
620 intermediate_output_ttl=0, merged_map=None,
621 priority=None, secret_store=None,
622 collection_cache_size=256,
623 collection_cache_is_default=True):
625 loadingContext = loadingContext.copy()
626 loadingContext.metadata = updated_tool.metadata.copy()
628 super(Runner, self).__init__(updated_tool.tool, loadingContext)
630 self.arvrunner = runner
631 self.embedded_tool = tool
632 self.job_order = None
635 # If reuse is permitted by command line arguments but
636 # disabled by the workflow itself, disable it.
637 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
639 enable_reuse = reuse_req["enableReuse"]
640 self.enable_reuse = enable_reuse
642 self.final_output = None
643 self.output_name = output_name
644 self.output_tags = output_tags
646 self.on_error = on_error
647 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
648 self.intermediate_output_ttl = intermediate_output_ttl
649 self.priority = priority
650 self.secret_store = secret_store
651 self.enable_dev = loadingContext.enable_dev
653 self.submit_runner_cores = 1
654 self.submit_runner_ram = 1024 # defaut 1 GiB
655 self.collection_cache_size = collection_cache_size
657 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
658 if runner_resource_req:
659 if runner_resource_req.get("coresMin"):
660 self.submit_runner_cores = runner_resource_req["coresMin"]
661 if runner_resource_req.get("ramMin"):
662 self.submit_runner_ram = runner_resource_req["ramMin"]
663 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
664 self.collection_cache_size = runner_resource_req["keep_cache"]
666 if submit_runner_ram:
667 # Command line / initializer overrides default and/or spec from workflow
668 self.submit_runner_ram = submit_runner_ram
670 if self.submit_runner_ram <= 0:
671 raise Exception("Value of submit-runner-ram must be greater than zero")
673 if self.submit_runner_cores <= 0:
674 raise Exception("Value of submit-runner-cores must be greater than zero")
676 self.merged_map = merged_map or {}
679 job_order, # type: Mapping[Text, Text]
680 output_callbacks, # type: Callable[[Any, Any], Any]
681 runtimeContext # type: RuntimeContext
682 ): # type: (...) -> Generator[Any, None, None]
683 self.job_order = job_order
684 self._init_job(job_order, runtimeContext)
687 def update_pipeline_component(self, record):
690 def done(self, record):
691 """Base method for handling a completed runner."""
694 if record["state"] == "Complete":
695 if record.get("exit_code") is not None:
696 if record["exit_code"] == 33:
697 processStatus = "UnsupportedRequirement"
698 elif record["exit_code"] == 0:
699 processStatus = "success"
701 processStatus = "permanentFail"
703 processStatus = "success"
705 processStatus = "permanentFail"
709 if processStatus == "permanentFail":
710 logc = arvados.collection.CollectionReader(record["log"],
711 api_client=self.arvrunner.api,
712 keep_client=self.arvrunner.keep_client,
713 num_retries=self.arvrunner.num_retries)
714 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
716 self.final_output = record["output"]
717 outc = arvados.collection.CollectionReader(self.final_output,
718 api_client=self.arvrunner.api,
719 keep_client=self.arvrunner.keep_client,
720 num_retries=self.arvrunner.num_retries)
721 if "cwl.output.json" in outc:
722 with outc.open("cwl.output.json", "rb") as f:
724 outputs = json.loads(f.read().decode())
725 def keepify(fileobj):
726 path = fileobj["location"]
727 if not path.startswith("keep:"):
728 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
729 adjustFileObjs(outputs, keepify)
730 adjustDirObjs(outputs, keepify)
732 logger.exception("[%s] While getting final output object", self.name)
733 self.arvrunner.output_callback({}, "permanentFail")
735 self.arvrunner.output_callback(outputs, processStatus)