1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
110 def search_schemadef(name, reqs):
112 if r["class"] == "SchemaDefRequirement":
113 for sd in r["types"]:
114 if sd["name"] == name:
118 primitive_types_set = frozenset(("null", "boolean", "int", "long",
119 "float", "double", "string", "record",
122 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
123 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
124 # union type, collect all possible secondaryFiles
125 for i in inputschema:
126 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129 if isinstance(inputschema, basestring):
130 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
136 if "secondaryFiles" in inputschema:
137 # set secondaryFiles, may be inherited by compound types.
138 secondaryspec = inputschema["secondaryFiles"]
140 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
141 not isinstance(inputschema["type"], basestring)):
142 # compound type (union, array, record)
143 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
145 elif (inputschema["type"] == "record" and
146 isinstance(primary, Mapping)):
148 # record type, find secondary files associated with fields.
150 for f in inputschema["fields"]:
151 p = primary.get(shortname(f["name"]))
153 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
155 elif (inputschema["type"] == "array" and
156 isinstance(primary, Sequence)):
158 # array type, find secondary files of elements
161 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
163 elif (inputschema["type"] == "File" and
165 isinstance(primary, Mapping) and
166 primary.get("class") == "File" and
167 "secondaryFiles" not in primary):
169 # Found a file, check for secondaryFiles
171 primary["secondaryFiles"] = []
172 for i, sf in enumerate(aslist(secondaryspec)):
173 pattern = builder.do_eval(sf["pattern"], context=primary)
176 sfpath = substitute(primary["location"], pattern)
177 required = builder.do_eval(sf.get("required"), context=primary)
179 if fsaccess.exists(sfpath):
180 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
182 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
183 "Required secondary file '%s' does not exist" % sfpath)
185 primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
186 if discovered is not None:
187 discovered[primary["location"]] = primary["secondaryFiles"]
188 elif inputschema["type"] not in primitive_types_set:
189 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
191 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
192 for inputschema in inputs:
193 primary = job_order.get(shortname(inputschema["id"]))
194 if isinstance(primary, (Mapping, Sequence)):
195 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
197 def upload_dependencies(arvrunner, name, document_loader,
198 workflowobj, uri, loadref_run,
199 include_primary=True, discovered_secondaryfiles=None):
200 """Upload the dependencies of the workflowobj document to Keep.
202 Returns a pathmapper object mapping local paths to keep references. Also
203 does an in-place update of references in "workflowobj".
205 Use scandeps to find $import, $include, $schemas, run, File and Directory
206 fields that represent external references.
208 If workflowobj has an "id" field, this will reload the document to ensure
209 it is scanning the raw document prior to preprocessing.
214 joined = document_loader.fetcher.urljoin(b, u)
215 defrg, _ = urllib.parse.urldefrag(joined)
216 if defrg not in loaded:
218 # Use fetch_text to get raw file (before preprocessing).
219 text = document_loader.fetch_text(defrg)
220 if isinstance(text, bytes):
221 textIO = StringIO(text.decode('utf-8'))
223 textIO = StringIO(text)
224 return yaml.safe_load(textIO)
229 loadref_fields = set(("$import", "run"))
231 loadref_fields = set(("$import",))
233 scanobj = workflowobj
234 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
235 # Need raw file content (before preprocessing) to ensure
236 # that external references in $include and $mixin are captured.
237 scanobj = loadref("", workflowobj["id"])
239 sc_result = scandeps(uri, scanobj,
241 set(("$include", "$schemas", "location")),
242 loadref, urljoin=document_loader.fetcher.urljoin)
247 def collect_uuids(obj):
248 loc = obj.get("location", "")
251 # Collect collection uuids that need to be resolved to
252 # portable data hashes
253 gp = collection_uuid_pattern.match(loc)
255 uuids[gp.groups()[0]] = obj
256 if collectionUUID in obj:
257 uuids[obj[collectionUUID]] = obj
259 def collect_uploads(obj):
260 loc = obj.get("location", "")
264 if sp[0] in ("file", "http", "https"):
265 # Record local files than need to be uploaded,
266 # don't include file literals, keep references, etc.
270 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
271 visit_class(sc_result, ("File", "Directory"), collect_uploads)
273 # Resolve any collection uuids we found to portable data hashes
274 # and assign them to uuid_map
276 fetch_uuids = list(uuids.keys())
278 # For a large number of fetch_uuids, API server may limit
279 # response size, so keep fetching from API server has nothing
281 lookups = arvrunner.api.collections().list(
282 filters=[["uuid", "in", fetch_uuids]],
284 select=["uuid", "portable_data_hash"]).execute(
285 num_retries=arvrunner.num_retries)
287 if not lookups["items"]:
290 for l in lookups["items"]:
291 uuid_map[l["uuid"]] = l["portable_data_hash"]
293 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
295 normalizeFilesDirs(sc)
297 if include_primary and "id" in workflowobj:
298 sc.append({"class": "File", "location": workflowobj["id"]})
300 if "$schemas" in workflowobj:
301 for s in workflowobj["$schemas"]:
302 sc.append({"class": "File", "location": s})
304 def visit_default(obj):
306 def ensure_default_location(f):
307 if "location" not in f and "path" in f:
308 f["location"] = f["path"]
310 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
311 # Doesn't exist, remove from list of dependencies to upload
312 sc[:] = [x for x in sc if x["location"] != f["location"]]
313 # Delete "default" from workflowobj
315 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
319 find_defaults(workflowobj, visit_default)
322 def discover_default_secondary_files(obj):
323 builder_job_order = {}
324 for t in obj["inputs"]:
325 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
326 # Need to create a builder object to evaluate expressions.
327 builder = make_builder(builder_job_order,
328 obj.get("hints", []),
329 obj.get("requirements", []),
331 discover_secondary_files(arvrunner.fs_access,
337 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
338 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
340 for d in list(discovered):
341 # Only interested in discovered secondaryFiles which are local
342 # files that need to be uploaded.
343 if d.startswith("file:"):
344 sc.extend(discovered[d])
348 mapper = ArvPathMapper(arvrunner, sc, "",
352 single_collection=True)
355 loc = p.get("location")
356 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
357 p["location"] = mapper.mapper(p["location"]).resolved
363 if collectionUUID in p:
364 uuid = p[collectionUUID]
365 if uuid not in uuid_map:
366 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
367 "Collection uuid %s not found" % uuid)
368 gp = collection_pdh_pattern.match(loc)
369 if gp and uuid_map[uuid] != gp.groups()[0]:
370 # This file entry has both collectionUUID and a PDH
371 # location. If the PDH doesn't match the one returned
372 # the API server, raise an error.
373 raise SourceLine(p, "location", validate.ValidationException).makeError(
374 "Expected collection uuid %s to be %s but API server reported %s" % (
375 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
377 gp = collection_uuid_pattern.match(loc)
380 uuid = gp.groups()[0]
381 if uuid not in uuid_map:
382 raise SourceLine(p, "location", validate.ValidationException).makeError(
383 "Collection uuid %s not found" % uuid)
384 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
385 p[collectionUUID] = uuid
387 visit_class(workflowobj, ("File", "Directory"), setloc)
388 visit_class(discovered, ("File", "Directory"), setloc)
390 if discovered_secondaryfiles is not None:
392 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
394 if "$schemas" in workflowobj:
396 for s in workflowobj["$schemas"]:
397 sch.append(mapper.mapper(s).resolved)
398 workflowobj["$schemas"] = sch
403 def upload_docker(arvrunner, tool):
404 """Uploads Docker images used in CommandLineTool objects."""
406 if isinstance(tool, CommandLineTool):
407 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
409 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
410 # TODO: can be supported by containers API, but not jobs API.
411 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
412 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
413 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
415 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
416 elif isinstance(tool, cwltool.workflow.Workflow):
418 upload_docker(arvrunner, s.embedded_tool)
421 def packed_workflow(arvrunner, tool, merged_map):
422 """Create a packed workflow.
424 A "packed" workflow is one where all the components have been combined into a single document."""
427 packed = pack(arvrunner.loadingContext, tool.tool["id"],
428 rewrite_out=rewrites,
429 loader=tool.doc_loader)
431 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
433 def visit(v, cur_id):
434 if isinstance(v, dict):
435 if v.get("class") in ("CommandLineTool", "Workflow"):
437 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
438 cur_id = rewrite_to_orig.get(v["id"], v["id"])
439 if "location" in v and not v["location"].startswith("keep:"):
440 v["location"] = merged_map[cur_id].resolved[v["location"]]
441 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
442 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
443 if v.get("class") == "DockerRequirement":
444 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
447 if isinstance(v, list):
454 def tag_git_version(packed):
455 if tool.tool["id"].startswith("file://"):
456 path = os.path.dirname(tool.tool["id"][7:])
458 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
459 except (OSError, subprocess.CalledProcessError):
462 packed["http://schema.org/version"] = githash
465 def upload_job_order(arvrunner, name, tool, job_order):
466 """Upload local files referenced in the input object and return updated input
467 object with 'location' updated to the proper keep references.
470 # Make a copy of the job order and set defaults.
471 builder_job_order = copy.copy(job_order)
473 # fill_in_defaults throws an error if there are any
474 # missing required parameters, we don't want it to do that
475 # so make them all optional.
476 inputs_copy = copy.deepcopy(tool.tool["inputs"])
477 for i in inputs_copy:
478 if "null" not in i["type"]:
479 i["type"] = ["null"] + aslist(i["type"])
481 fill_in_defaults(inputs_copy,
484 # Need to create a builder object to evaluate expressions.
485 builder = make_builder(builder_job_order,
489 # Now update job_order with secondaryFiles
490 discover_secondary_files(arvrunner.fs_access,
495 jobmapper = upload_dependencies(arvrunner,
499 job_order.get("id", "#"),
502 if "id" in job_order:
505 # Need to filter this out, gets added by cwltool when providing
506 # parameters on the command line.
507 if "job_order" in job_order:
508 del job_order["job_order"]
512 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
514 def upload_workflow_deps(arvrunner, tool):
515 # Ensure that Docker images needed by this workflow are available
517 upload_docker(arvrunner, tool)
519 document_loader = tool.doc_loader
523 def upload_tool_deps(deptool):
525 discovered_secondaryfiles = {}
526 pm = upload_dependencies(arvrunner,
527 "%s dependencies" % (shortname(deptool["id"])),
532 include_primary=False,
533 discovered_secondaryfiles=discovered_secondaryfiles)
534 document_loader.idx[deptool["id"]] = deptool
536 for k,v in pm.items():
537 toolmap[k] = v.resolved
538 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
540 tool.visit(upload_tool_deps)
544 def arvados_jobs_image(arvrunner, img):
545 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
548 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
549 except Exception as e:
550 raise Exception("Docker image %s is not available\n%s" % (img, e) )
553 def upload_workflow_collection(arvrunner, name, packed):
554 collection = arvados.collection.Collection(api_client=arvrunner.api,
555 keep_client=arvrunner.keep_client,
556 num_retries=arvrunner.num_retries)
557 with collection.open("workflow.cwl", "w") as f:
558 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
560 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
561 ["name", "like", name+"%"]]
562 if arvrunner.project_uuid:
563 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
564 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
567 logger.info("Using collection %s", exists["items"][0]["uuid"])
569 collection.save_new(name=name,
570 owner_uuid=arvrunner.project_uuid,
571 ensure_unique_name=True,
572 num_retries=arvrunner.num_retries)
573 logger.info("Uploaded to %s", collection.manifest_locator())
575 return collection.portable_data_hash()
578 class Runner(Process):
579 """Base class for runner processes, which submit an instance of
580 arvados-cwl-runner and wait for the final result."""
582 def __init__(self, runner, updated_tool,
583 tool, loadingContext, enable_reuse,
584 output_name, output_tags, submit_runner_ram=0,
585 name=None, on_error=None, submit_runner_image=None,
586 intermediate_output_ttl=0, merged_map=None,
587 priority=None, secret_store=None,
588 collection_cache_size=256,
589 collection_cache_is_default=True):
591 loadingContext = loadingContext.copy()
592 loadingContext.metadata = updated_tool.metadata.copy()
594 super(Runner, self).__init__(updated_tool.tool, loadingContext)
596 self.arvrunner = runner
597 self.embedded_tool = tool
598 self.job_order = None
601 # If reuse is permitted by command line arguments but
602 # disabled by the workflow itself, disable it.
603 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
605 enable_reuse = reuse_req["enableReuse"]
606 self.enable_reuse = enable_reuse
608 self.final_output = None
609 self.output_name = output_name
610 self.output_tags = output_tags
612 self.on_error = on_error
613 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
614 self.intermediate_output_ttl = intermediate_output_ttl
615 self.priority = priority
616 self.secret_store = secret_store
617 self.enable_dev = loadingContext.enable_dev
619 self.submit_runner_cores = 1
620 self.submit_runner_ram = 1024 # defaut 1 GiB
621 self.collection_cache_size = collection_cache_size
623 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
624 if runner_resource_req:
625 if runner_resource_req.get("coresMin"):
626 self.submit_runner_cores = runner_resource_req["coresMin"]
627 if runner_resource_req.get("ramMin"):
628 self.submit_runner_ram = runner_resource_req["ramMin"]
629 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
630 self.collection_cache_size = runner_resource_req["keep_cache"]
632 if submit_runner_ram:
633 # Command line / initializer overrides default and/or spec from workflow
634 self.submit_runner_ram = submit_runner_ram
636 if self.submit_runner_ram <= 0:
637 raise Exception("Value of submit-runner-ram must be greater than zero")
639 if self.submit_runner_cores <= 0:
640 raise Exception("Value of submit-runner-cores must be greater than zero")
642 self.merged_map = merged_map or {}
645 job_order, # type: Mapping[Text, Text]
646 output_callbacks, # type: Callable[[Any, Any], Any]
647 runtimeContext # type: RuntimeContext
648 ): # type: (...) -> Generator[Any, None, None]
649 self.job_order = job_order
650 self._init_job(job_order, runtimeContext)
653 def update_pipeline_component(self, record):
656 def done(self, record):
657 """Base method for handling a completed runner."""
660 if record["state"] == "Complete":
661 if record.get("exit_code") is not None:
662 if record["exit_code"] == 33:
663 processStatus = "UnsupportedRequirement"
664 elif record["exit_code"] == 0:
665 processStatus = "success"
667 processStatus = "permanentFail"
669 processStatus = "success"
671 processStatus = "permanentFail"
675 if processStatus == "permanentFail":
676 logc = arvados.collection.CollectionReader(record["log"],
677 api_client=self.arvrunner.api,
678 keep_client=self.arvrunner.keep_client,
679 num_retries=self.arvrunner.num_retries)
680 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
682 self.final_output = record["output"]
683 outc = arvados.collection.CollectionReader(self.final_output,
684 api_client=self.arvrunner.api,
685 keep_client=self.arvrunner.keep_client,
686 num_retries=self.arvrunner.num_retries)
687 if "cwl.output.json" in outc:
688 with outc.open("cwl.output.json", "rb") as f:
690 outputs = json.loads(f.read().decode())
691 def keepify(fileobj):
692 path = fileobj["location"]
693 if not path.startswith("keep:"):
694 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
695 adjustFileObjs(outputs, keepify)
696 adjustDirObjs(outputs, keepify)
698 logger.exception("[%s] While getting final output object", self.name)
699 self.arvrunner.output_callback({}, "permanentFail")
701 self.arvrunner.output_callback(outputs, processStatus)