1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
110 def search_schemadef(name, reqs):
112 if r["class"] == "SchemaDefRequirement":
113 for sd in r["types"]:
114 if sd["name"] == name:
118 primitive_types_set = frozenset(("null", "boolean", "int", "long",
119 "float", "double", "string", "record",
122 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
123 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
124 # union type, collect all possible secondaryFiles
125 for i in inputschema:
126 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129 if isinstance(inputschema, basestring):
130 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
136 if "secondaryFiles" in inputschema:
137 # set secondaryFiles, may be inherited by compound types.
138 secondaryspec = inputschema["secondaryFiles"]
140 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
141 not isinstance(inputschema["type"], basestring)):
142 # compound type (union, array, record)
143 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
145 elif (inputschema["type"] == "record" and
146 isinstance(primary, Mapping)):
148 # record type, find secondary files associated with fields.
150 for f in inputschema["fields"]:
151 p = primary.get(shortname(f["name"]))
153 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
155 elif (inputschema["type"] == "array" and
156 isinstance(primary, Sequence)):
158 # array type, find secondary files of elements
161 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
163 elif (inputschema["type"] == "File" and
165 isinstance(primary, Mapping) and
166 primary.get("class") == "File" and
167 "secondaryFiles" not in primary):
169 # Found a file, check for secondaryFiles
171 primary["secondaryFiles"] = []
172 for i, sf in enumerate(aslist(secondaryspec)):
173 pattern = builder.do_eval(sf["pattern"], context=primary)
176 sfpath = substitute(primary["location"], pattern)
177 required = builder.do_eval(sf.get("required"), context=primary)
179 if fsaccess.exists(sfpath):
180 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
182 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
183 "Required secondary file '%s' does not exist" % sfpath)
185 primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
186 if discovered is not None:
187 discovered[primary["location"]] = primary["secondaryFiles"]
188 elif inputschema["type"] not in primitive_types_set:
189 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
191 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
192 for inputschema in inputs:
193 primary = job_order.get(shortname(inputschema["id"]))
194 if isinstance(primary, (Mapping, Sequence)):
195 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
197 def upload_dependencies(arvrunner, name, document_loader,
198 workflowobj, uri, loadref_run,
199 include_primary=True, discovered_secondaryfiles=None):
200 """Upload the dependencies of the workflowobj document to Keep.
202 Returns a pathmapper object mapping local paths to keep references. Also
203 does an in-place update of references in "workflowobj".
205 Use scandeps to find $import, $include, $schemas, run, File and Directory
206 fields that represent external references.
208 If workflowobj has an "id" field, this will reload the document to ensure
209 it is scanning the raw document prior to preprocessing.
214 joined = document_loader.fetcher.urljoin(b, u)
215 defrg, _ = urllib.parse.urldefrag(joined)
216 if defrg not in loaded:
218 # Use fetch_text to get raw file (before preprocessing).
219 text = document_loader.fetch_text(defrg)
220 if isinstance(text, bytes):
221 textIO = StringIO(text.decode('utf-8'))
223 textIO = StringIO(text)
224 return yaml.safe_load(textIO)
229 loadref_fields = set(("$import", "run"))
231 loadref_fields = set(("$import",))
233 scanobj = workflowobj
234 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
235 # Need raw file content (before preprocessing) to ensure
236 # that external references in $include and $mixin are captured.
237 scanobj = loadref("", workflowobj["id"])
239 sc_result = scandeps(uri, scanobj,
241 set(("$include", "$schemas", "location")),
242 loadref, urljoin=document_loader.fetcher.urljoin)
247 def collect_uuids(obj):
248 loc = obj.get("location", "")
251 # Collect collection uuids that need to be resolved to
252 # portable data hashes
253 gp = collection_uuid_pattern.match(loc)
255 uuids[gp.groups()[0]] = obj
256 if collectionUUID in obj:
257 uuids[obj[collectionUUID]] = obj
259 def collect_uploads(obj):
260 loc = obj.get("location", "")
264 if sp[0] in ("file", "http", "https"):
265 # Record local files than need to be uploaded,
266 # don't include file literals, keep references, etc.
270 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
271 visit_class(sc_result, ("File", "Directory"), collect_uploads)
273 # Resolve any collection uuids we found to portable data hashes
274 # and assign them to uuid_map
276 fetch_uuids = list(uuids.keys())
278 # For a large number of fetch_uuids, API server may limit
279 # response size, so keep fetching from API server has nothing
281 lookups = arvrunner.api.collections().list(
282 filters=[["uuid", "in", fetch_uuids]],
284 select=["uuid", "portable_data_hash"]).execute(
285 num_retries=arvrunner.num_retries)
287 if not lookups["items"]:
290 for l in lookups["items"]:
291 uuid_map[l["uuid"]] = l["portable_data_hash"]
293 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
295 normalizeFilesDirs(sc)
297 if include_primary and "id" in workflowobj:
298 sc.append({"class": "File", "location": workflowobj["id"]})
300 if "$schemas" in workflowobj:
301 for s in workflowobj["$schemas"]:
302 sc.append({"class": "File", "location": s})
304 def visit_default(obj):
306 def ensure_default_location(f):
307 if "location" not in f and "path" in f:
308 f["location"] = f["path"]
310 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
311 # Doesn't exist, remove from list of dependencies to upload
312 sc[:] = [x for x in sc if x["location"] != f["location"]]
313 # Delete "default" from workflowobj
315 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
319 find_defaults(workflowobj, visit_default)
322 def discover_default_secondary_files(obj):
323 builder_job_order = {}
324 for t in obj["inputs"]:
325 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
326 # Need to create a builder object to evaluate expressions.
327 builder = make_builder(builder_job_order,
328 obj.get("hints", []),
329 obj.get("requirements", []),
331 discover_secondary_files(arvrunner.fs_access,
337 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
338 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
340 for d in list(discovered):
341 # Only interested in discovered secondaryFiles which are local
342 # files that need to be uploaded.
343 if d.startswith("file:"):
344 sc.extend(discovered[d])
348 mapper = ArvPathMapper(arvrunner, sc, "",
352 single_collection=True)
355 loc = p.get("location")
356 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
357 p["location"] = mapper.mapper(p["location"]).resolved
363 if collectionUUID in p:
364 uuid = p[collectionUUID]
365 if uuid not in uuid_map:
366 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
367 "Collection uuid %s not found" % uuid)
368 gp = collection_pdh_pattern.match(loc)
369 if gp and uuid_map[uuid] != gp.groups()[0]:
370 # This file entry has both collectionUUID and a PDH
371 # location. If the PDH doesn't match the one returned
372 # the API server, raise an error.
373 raise SourceLine(p, "location", validate.ValidationException).makeError(
374 "Expected collection uuid %s to be %s but API server reported %s" % (
375 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
377 gp = collection_uuid_pattern.match(loc)
380 uuid = gp.groups()[0]
381 if uuid not in uuid_map:
382 raise SourceLine(p, "location", validate.ValidationException).makeError(
383 "Collection uuid %s not found" % uuid)
384 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
385 p[collectionUUID] = uuid
387 visit_class(workflowobj, ("File", "Directory"), setloc)
388 visit_class(discovered, ("File", "Directory"), setloc)
390 if discovered_secondaryfiles is not None:
392 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
394 if "$schemas" in workflowobj:
396 for s in workflowobj["$schemas"]:
397 sch.append(mapper.mapper(s).resolved)
398 workflowobj["$schemas"] = sch
403 def upload_docker(arvrunner, tool):
404 """Uploads Docker images used in CommandLineTool objects."""
406 if isinstance(tool, CommandLineTool):
407 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
409 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
410 # TODO: can be supported by containers API, but not jobs API.
411 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
412 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
413 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
415 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
416 elif isinstance(tool, cwltool.workflow.Workflow):
418 upload_docker(arvrunner, s.embedded_tool)
421 def packed_workflow(arvrunner, tool, merged_map):
422 """Create a packed workflow.
424 A "packed" workflow is one where all the components have been combined into a single document."""
427 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
428 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
430 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
432 def visit(v, cur_id):
433 if isinstance(v, dict):
434 if v.get("class") in ("CommandLineTool", "Workflow"):
436 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
437 cur_id = rewrite_to_orig.get(v["id"], v["id"])
438 if "location" in v and not v["location"].startswith("keep:"):
439 v["location"] = merged_map[cur_id].resolved[v["location"]]
440 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
441 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
442 if v.get("class") == "DockerRequirement":
443 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
446 if isinstance(v, list):
453 def tag_git_version(packed):
454 if tool.tool["id"].startswith("file://"):
455 path = os.path.dirname(tool.tool["id"][7:])
457 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
458 except (OSError, subprocess.CalledProcessError):
461 packed["http://schema.org/version"] = githash
464 def upload_job_order(arvrunner, name, tool, job_order):
465 """Upload local files referenced in the input object and return updated input
466 object with 'location' updated to the proper keep references.
469 # Make a copy of the job order and set defaults.
470 builder_job_order = copy.copy(job_order)
472 # fill_in_defaults throws an error if there are any
473 # missing required parameters, we don't want it to do that
474 # so make them all optional.
475 inputs_copy = copy.deepcopy(tool.tool["inputs"])
476 for i in inputs_copy:
477 if "null" not in i["type"]:
478 i["type"] = ["null"] + aslist(i["type"])
480 fill_in_defaults(inputs_copy,
483 # Need to create a builder object to evaluate expressions.
484 builder = make_builder(builder_job_order,
488 # Now update job_order with secondaryFiles
489 discover_secondary_files(arvrunner.fs_access,
494 jobmapper = upload_dependencies(arvrunner,
498 job_order.get("id", "#"),
501 if "id" in job_order:
504 # Need to filter this out, gets added by cwltool when providing
505 # parameters on the command line.
506 if "job_order" in job_order:
507 del job_order["job_order"]
511 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
513 def upload_workflow_deps(arvrunner, tool):
514 # Ensure that Docker images needed by this workflow are available
516 upload_docker(arvrunner, tool)
518 document_loader = tool.doc_loader
522 def upload_tool_deps(deptool):
524 discovered_secondaryfiles = {}
525 pm = upload_dependencies(arvrunner,
526 "%s dependencies" % (shortname(deptool["id"])),
531 include_primary=False,
532 discovered_secondaryfiles=discovered_secondaryfiles)
533 document_loader.idx[deptool["id"]] = deptool
535 for k,v in pm.items():
536 toolmap[k] = v.resolved
537 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
539 tool.visit(upload_tool_deps)
543 def arvados_jobs_image(arvrunner, img):
544 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
547 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
548 except Exception as e:
549 raise Exception("Docker image %s is not available\n%s" % (img, e) )
552 def upload_workflow_collection(arvrunner, name, packed):
553 collection = arvados.collection.Collection(api_client=arvrunner.api,
554 keep_client=arvrunner.keep_client,
555 num_retries=arvrunner.num_retries)
556 with collection.open("workflow.cwl", "w") as f:
557 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
559 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
560 ["name", "like", name+"%"]]
561 if arvrunner.project_uuid:
562 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
563 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
566 logger.info("Using collection %s", exists["items"][0]["uuid"])
568 collection.save_new(name=name,
569 owner_uuid=arvrunner.project_uuid,
570 ensure_unique_name=True,
571 num_retries=arvrunner.num_retries)
572 logger.info("Uploaded to %s", collection.manifest_locator())
574 return collection.portable_data_hash()
577 class Runner(Process):
578 """Base class for runner processes, which submit an instance of
579 arvados-cwl-runner and wait for the final result."""
581 def __init__(self, runner, updated_tool,
582 tool, loadingContext, enable_reuse,
583 output_name, output_tags, submit_runner_ram=0,
584 name=None, on_error=None, submit_runner_image=None,
585 intermediate_output_ttl=0, merged_map=None,
586 priority=None, secret_store=None,
587 collection_cache_size=256,
588 collection_cache_is_default=True):
590 loadingContext = loadingContext.copy()
591 loadingContext.metadata = updated_tool.metadata.copy()
593 super(Runner, self).__init__(updated_tool.tool, loadingContext)
595 self.arvrunner = runner
596 self.embedded_tool = tool
597 self.job_order = None
600 # If reuse is permitted by command line arguments but
601 # disabled by the workflow itself, disable it.
602 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
604 enable_reuse = reuse_req["enableReuse"]
605 self.enable_reuse = enable_reuse
607 self.final_output = None
608 self.output_name = output_name
609 self.output_tags = output_tags
611 self.on_error = on_error
612 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
613 self.intermediate_output_ttl = intermediate_output_ttl
614 self.priority = priority
615 self.secret_store = secret_store
616 self.enable_dev = loadingContext.enable_dev
618 self.submit_runner_cores = 1
619 self.submit_runner_ram = 1024 # defaut 1 GiB
620 self.collection_cache_size = collection_cache_size
622 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
623 if runner_resource_req:
624 if runner_resource_req.get("coresMin"):
625 self.submit_runner_cores = runner_resource_req["coresMin"]
626 if runner_resource_req.get("ramMin"):
627 self.submit_runner_ram = runner_resource_req["ramMin"]
628 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
629 self.collection_cache_size = runner_resource_req["keep_cache"]
631 if submit_runner_ram:
632 # Command line / initializer overrides default and/or spec from workflow
633 self.submit_runner_ram = submit_runner_ram
635 if self.submit_runner_ram <= 0:
636 raise Exception("Value of submit-runner-ram must be greater than zero")
638 if self.submit_runner_cores <= 0:
639 raise Exception("Value of submit-runner-cores must be greater than zero")
641 self.merged_map = merged_map or {}
644 job_order, # type: Mapping[Text, Text]
645 output_callbacks, # type: Callable[[Any, Any], Any]
646 runtimeContext # type: RuntimeContext
647 ): # type: (...) -> Generator[Any, None, None]
648 self.job_order = job_order
649 self._init_job(job_order, runtimeContext)
652 def update_pipeline_component(self, record):
655 def done(self, record):
656 """Base method for handling a completed runner."""
659 if record["state"] == "Complete":
660 if record.get("exit_code") is not None:
661 if record["exit_code"] == 33:
662 processStatus = "UnsupportedRequirement"
663 elif record["exit_code"] == 0:
664 processStatus = "success"
666 processStatus = "permanentFail"
668 processStatus = "success"
670 processStatus = "permanentFail"
674 if processStatus == "permanentFail":
675 logc = arvados.collection.CollectionReader(record["log"],
676 api_client=self.arvrunner.api,
677 keep_client=self.arvrunner.keep_client,
678 num_retries=self.arvrunner.num_retries)
679 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
681 self.final_output = record["output"]
682 outc = arvados.collection.CollectionReader(self.final_output,
683 api_client=self.arvrunner.api,
684 keep_client=self.arvrunner.keep_client,
685 num_retries=self.arvrunner.num_retries)
686 if "cwl.output.json" in outc:
687 with outc.open("cwl.output.json", "rb") as f:
689 outputs = json.loads(f.read().decode())
690 def keepify(fileobj):
691 path = fileobj["location"]
692 if not path.startswith("keep:"):
693 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
694 adjustFileObjs(outputs, keepify)
695 adjustDirObjs(outputs, keepify)
697 logger.exception("[%s] While getting final output object", self.name)
698 self.arvrunner.output_callback({}, "permanentFail")
700 self.arvrunner.output_callback(outputs, processStatus)