1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
110 def search_schemadef(name, reqs):
112 if r["class"] == "SchemaDefRequirement":
113 for sd in r["types"]:
114 if sd["name"] == name:
118 primitive_types_set = frozenset(("null", "boolean", "int", "long",
119 "float", "double", "string", "record",
122 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
123 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
124 # union type, collect all possible secondaryFiles
125 for i in inputschema:
126 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129 if isinstance(inputschema, basestring):
130 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
136 if "secondaryFiles" in inputschema:
137 # set secondaryFiles, may be inherited by compound types.
138 secondaryspec = inputschema["secondaryFiles"]
140 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
141 not isinstance(inputschema["type"], basestring)):
142 # compound type (union, array, record)
143 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
145 elif (inputschema["type"] == "record" and
146 isinstance(primary, Mapping)):
148 # record type, find secondary files associated with fields.
150 for f in inputschema["fields"]:
151 p = primary.get(shortname(f["name"]))
153 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
155 elif (inputschema["type"] == "array" and
156 isinstance(primary, Sequence)):
158 # array type, find secondary files of elements
161 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
163 elif (inputschema["type"] == "File" and
165 isinstance(primary, Mapping) and
166 primary.get("class") == "File" and
167 "secondaryFiles" not in primary):
169 # Found a file, check for secondaryFiles
171 primary["secondaryFiles"] = []
172 for i, sf in enumerate(aslist(secondaryspec)):
173 pattern = builder.do_eval(sf["pattern"], context=primary)
176 sfpath = substitute(primary["location"], pattern)
177 required = builder.do_eval(sf.get("required"), context=primary)
179 if fsaccess.exists(sfpath):
180 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
182 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
183 "Required secondary file '%s' does not exist" % sfpath)
185 primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
186 if discovered is not None:
187 discovered[primary["location"]] = primary["secondaryFiles"]
188 elif inputschema["type"] not in primitive_types_set:
189 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
191 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
192 for inputschema in inputs:
193 primary = job_order.get(shortname(inputschema["id"]))
194 if isinstance(primary, (Mapping, Sequence)):
195 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
197 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
198 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
200 def upload_dependencies(arvrunner, name, document_loader,
201 workflowobj, uri, loadref_run,
202 include_primary=True, discovered_secondaryfiles=None):
203 """Upload the dependencies of the workflowobj document to Keep.
205 Returns a pathmapper object mapping local paths to keep references. Also
206 does an in-place update of references in "workflowobj".
208 Use scandeps to find $import, $include, $schemas, run, File and Directory
209 fields that represent external references.
211 If workflowobj has an "id" field, this will reload the document to ensure
212 it is scanning the raw document prior to preprocessing.
217 joined = document_loader.fetcher.urljoin(b, u)
218 defrg, _ = urllib.parse.urldefrag(joined)
219 if defrg not in loaded:
221 # Use fetch_text to get raw file (before preprocessing).
222 text = document_loader.fetch_text(defrg)
223 if isinstance(text, bytes):
224 textIO = StringIO(text.decode('utf-8'))
226 textIO = StringIO(text)
227 return yaml.safe_load(textIO)
232 loadref_fields = set(("$import", "run"))
234 loadref_fields = set(("$import",))
236 scanobj = workflowobj
237 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
238 # Need raw file content (before preprocessing) to ensure
239 # that external references in $include and $mixin are captured.
240 scanobj = loadref("", workflowobj["id"])
242 sc_result = scandeps(uri, scanobj,
244 set(("$include", "$schemas", "location")),
245 loadref, urljoin=document_loader.fetcher.urljoin)
250 def collect_uuids(obj):
251 loc = obj.get("location", "")
254 # Collect collection uuids that need to be resolved to
255 # portable data hashes
256 gp = collection_uuid_pattern.match(loc)
258 uuids[gp.groups()[0]] = obj
259 if collectionUUID in obj:
260 uuids[obj[collectionUUID]] = obj
262 def collect_uploads(obj):
263 loc = obj.get("location", "")
267 if sp[0] in ("file", "http", "https"):
268 # Record local files than need to be uploaded,
269 # don't include file literals, keep references, etc.
273 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
274 visit_class(sc_result, ("File", "Directory"), collect_uploads)
276 # Resolve any collection uuids we found to portable data hashes
277 # and assign them to uuid_map
279 fetch_uuids = list(uuids.keys())
281 # For a large number of fetch_uuids, API server may limit
282 # response size, so keep fetching from API server has nothing
284 lookups = arvrunner.api.collections().list(
285 filters=[["uuid", "in", fetch_uuids]],
287 select=["uuid", "portable_data_hash"]).execute(
288 num_retries=arvrunner.num_retries)
290 if not lookups["items"]:
293 for l in lookups["items"]:
294 uuid_map[l["uuid"]] = l["portable_data_hash"]
296 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
298 normalizeFilesDirs(sc)
300 if include_primary and "id" in workflowobj:
301 sc.append({"class": "File", "location": workflowobj["id"]})
303 if "$schemas" in workflowobj:
304 for s in workflowobj["$schemas"]:
305 sc.append({"class": "File", "location": s})
307 def visit_default(obj):
309 def ensure_default_location(f):
310 if "location" not in f and "path" in f:
311 f["location"] = f["path"]
313 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
314 # Doesn't exist, remove from list of dependencies to upload
315 sc[:] = [x for x in sc if x["location"] != f["location"]]
316 # Delete "default" from workflowobj
318 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
322 find_defaults(workflowobj, visit_default)
325 def discover_default_secondary_files(obj):
326 builder_job_order = {}
327 for t in obj["inputs"]:
328 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
329 # Need to create a builder object to evaluate expressions.
330 builder = make_builder(builder_job_order,
331 obj.get("hints", []),
332 obj.get("requirements", []),
334 discover_secondary_files(arvrunner.fs_access,
340 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
342 for d in list(discovered):
343 # Only interested in discovered secondaryFiles which are local
344 # files that need to be uploaded.
345 if d.startswith("file:"):
346 sc.extend(discovered[d])
350 mapper = ArvPathMapper(arvrunner, sc, "",
354 single_collection=True)
357 loc = p.get("location")
358 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
359 p["location"] = mapper.mapper(p["location"]).resolved
365 if collectionUUID in p:
366 uuid = p[collectionUUID]
367 if uuid not in uuid_map:
368 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
369 "Collection uuid %s not found" % uuid)
370 gp = collection_pdh_pattern.match(loc)
371 if gp and uuid_map[uuid] != gp.groups()[0]:
372 # This file entry has both collectionUUID and a PDH
373 # location. If the PDH doesn't match the one returned
374 # the API server, raise an error.
375 raise SourceLine(p, "location", validate.ValidationException).makeError(
376 "Expected collection uuid %s to be %s but API server reported %s" % (
377 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
379 gp = collection_uuid_pattern.match(loc)
382 uuid = gp.groups()[0]
383 if uuid not in uuid_map:
384 raise SourceLine(p, "location", validate.ValidationException).makeError(
385 "Collection uuid %s not found" % uuid)
386 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
387 p[collectionUUID] = uuid
389 visit_class(workflowobj, ("File", "Directory"), setloc)
390 visit_class(discovered, ("File", "Directory"), setloc)
392 if discovered_secondaryfiles is not None:
394 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
396 if "$schemas" in workflowobj:
398 for s in workflowobj["$schemas"]:
399 sch.append(mapper.mapper(s).resolved)
400 workflowobj["$schemas"] = sch
405 def upload_docker(arvrunner, tool):
406 """Uploads Docker images used in CommandLineTool objects."""
408 if isinstance(tool, CommandLineTool):
409 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
411 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
412 # TODO: can be supported by containers API, but not jobs API.
413 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
414 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
415 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
417 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
418 elif isinstance(tool, cwltool.workflow.Workflow):
420 upload_docker(arvrunner, s.embedded_tool)
423 def packed_workflow(arvrunner, tool, merged_map):
424 """Create a packed workflow.
426 A "packed" workflow is one where all the components have been combined into a single document."""
429 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
430 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
432 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
434 def visit(v, cur_id):
435 if isinstance(v, dict):
436 if v.get("class") in ("CommandLineTool", "Workflow"):
438 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
439 cur_id = rewrite_to_orig.get(v["id"], v["id"])
440 if "location" in v and not v["location"].startswith("keep:"):
441 v["location"] = merged_map[cur_id].resolved[v["location"]]
442 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
443 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
444 if v.get("class") == "DockerRequirement":
445 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
448 if isinstance(v, list):
455 def tag_git_version(packed):
456 if tool.tool["id"].startswith("file://"):
457 path = os.path.dirname(tool.tool["id"][7:])
459 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
460 except (OSError, subprocess.CalledProcessError):
463 packed["http://schema.org/version"] = githash
466 def upload_job_order(arvrunner, name, tool, job_order):
467 """Upload local files referenced in the input object and return updated input
468 object with 'location' updated to the proper keep references.
471 # Make a copy of the job order and set defaults.
472 builder_job_order = copy.copy(job_order)
474 # fill_in_defaults throws an error if there are any
475 # missing required parameters, we don't want it to do that
476 # so make them all optional.
477 inputs_copy = copy.deepcopy(tool.tool["inputs"])
478 for i in inputs_copy:
479 if "null" not in i["type"]:
480 i["type"] = ["null"] + aslist(i["type"])
482 fill_in_defaults(inputs_copy,
485 # Need to create a builder object to evaluate expressions.
486 builder = make_builder(builder_job_order,
490 # Now update job_order with secondaryFiles
491 discover_secondary_files(arvrunner.fs_access,
496 jobmapper = upload_dependencies(arvrunner,
500 job_order.get("id", "#"),
503 if "id" in job_order:
506 # Need to filter this out, gets added by cwltool when providing
507 # parameters on the command line.
508 if "job_order" in job_order:
509 del job_order["job_order"]
513 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
515 def upload_workflow_deps(arvrunner, tool):
516 # Ensure that Docker images needed by this workflow are available
518 upload_docker(arvrunner, tool)
520 document_loader = tool.doc_loader
524 def upload_tool_deps(deptool):
526 discovered_secondaryfiles = {}
527 pm = upload_dependencies(arvrunner,
528 "%s dependencies" % (shortname(deptool["id"])),
533 include_primary=False,
534 discovered_secondaryfiles=discovered_secondaryfiles)
535 document_loader.idx[deptool["id"]] = deptool
537 for k,v in pm.items():
538 toolmap[k] = v.resolved
539 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
541 tool.visit(upload_tool_deps)
545 def arvados_jobs_image(arvrunner, img):
546 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
549 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
550 except Exception as e:
551 raise Exception("Docker image %s is not available\n%s" % (img, e) )
554 def upload_workflow_collection(arvrunner, name, packed):
555 collection = arvados.collection.Collection(api_client=arvrunner.api,
556 keep_client=arvrunner.keep_client,
557 num_retries=arvrunner.num_retries)
558 with collection.open("workflow.cwl", "w") as f:
559 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
561 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
562 ["name", "like", name+"%"]]
563 if arvrunner.project_uuid:
564 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
565 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
568 logger.info("Using collection %s", exists["items"][0]["uuid"])
570 collection.save_new(name=name,
571 owner_uuid=arvrunner.project_uuid,
572 ensure_unique_name=True,
573 num_retries=arvrunner.num_retries)
574 logger.info("Uploaded to %s", collection.manifest_locator())
576 return collection.portable_data_hash()
579 class Runner(Process):
580 """Base class for runner processes, which submit an instance of
581 arvados-cwl-runner and wait for the final result."""
583 def __init__(self, runner, tool, loadingContext, enable_reuse,
584 output_name, output_tags, submit_runner_ram=0,
585 name=None, on_error=None, submit_runner_image=None,
586 intermediate_output_ttl=0, merged_map=None,
587 priority=None, secret_store=None,
588 collection_cache_size=256,
589 collection_cache_is_default=True):
591 loadingContext = loadingContext.copy()
592 loadingContext.metadata = loadingContext.metadata.copy()
593 loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
595 super(Runner, self).__init__(tool.tool, loadingContext)
597 self.arvrunner = runner
598 self.embedded_tool = tool
599 self.job_order = None
602 # If reuse is permitted by command line arguments but
603 # disabled by the workflow itself, disable it.
604 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
606 enable_reuse = reuse_req["enableReuse"]
607 self.enable_reuse = enable_reuse
609 self.final_output = None
610 self.output_name = output_name
611 self.output_tags = output_tags
613 self.on_error = on_error
614 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
615 self.intermediate_output_ttl = intermediate_output_ttl
616 self.priority = priority
617 self.secret_store = secret_store
618 self.enable_dev = loadingContext.enable_dev
620 self.submit_runner_cores = 1
621 self.submit_runner_ram = 1024 # defaut 1 GiB
622 self.collection_cache_size = collection_cache_size
624 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
625 if runner_resource_req:
626 if runner_resource_req.get("coresMin"):
627 self.submit_runner_cores = runner_resource_req["coresMin"]
628 if runner_resource_req.get("ramMin"):
629 self.submit_runner_ram = runner_resource_req["ramMin"]
630 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
631 self.collection_cache_size = runner_resource_req["keep_cache"]
633 if submit_runner_ram:
634 # Command line / initializer overrides default and/or spec from workflow
635 self.submit_runner_ram = submit_runner_ram
637 if self.submit_runner_ram <= 0:
638 raise Exception("Value of submit-runner-ram must be greater than zero")
640 if self.submit_runner_cores <= 0:
641 raise Exception("Value of submit-runner-cores must be greater than zero")
643 self.merged_map = merged_map or {}
646 job_order, # type: Mapping[Text, Text]
647 output_callbacks, # type: Callable[[Any, Any], Any]
648 runtimeContext # type: RuntimeContext
649 ): # type: (...) -> Generator[Any, None, None]
650 self.job_order = job_order
651 self._init_job(job_order, runtimeContext)
654 def update_pipeline_component(self, record):
657 def done(self, record):
658 """Base method for handling a completed runner."""
661 if record["state"] == "Complete":
662 if record.get("exit_code") is not None:
663 if record["exit_code"] == 33:
664 processStatus = "UnsupportedRequirement"
665 elif record["exit_code"] == 0:
666 processStatus = "success"
668 processStatus = "permanentFail"
670 processStatus = "success"
672 processStatus = "permanentFail"
676 if processStatus == "permanentFail":
677 logc = arvados.collection.CollectionReader(record["log"],
678 api_client=self.arvrunner.api,
679 keep_client=self.arvrunner.keep_client,
680 num_retries=self.arvrunner.num_retries)
681 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
683 self.final_output = record["output"]
684 outc = arvados.collection.CollectionReader(self.final_output,
685 api_client=self.arvrunner.api,
686 keep_client=self.arvrunner.keep_client,
687 num_retries=self.arvrunner.num_retries)
688 if "cwl.output.json" in outc:
689 with outc.open("cwl.output.json", "rb") as f:
691 outputs = json.loads(f.read().decode())
692 def keepify(fileobj):
693 path = fileobj["location"]
694 if not path.startswith("keep:"):
695 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
696 adjustFileObjs(outputs, keepify)
697 adjustDirObjs(outputs, keepify)
699 logger.exception("[%s] While getting final output object", self.name)
700 self.arvrunner.output_callback({}, "permanentFail")
702 self.arvrunner.output_callback(outputs, processStatus)