1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
110 def search_schemadef(name, reqs):
112 if r["class"] == "SchemaDefRequirement":
113 for sd in r["types"]:
114 if sd["name"] == name:
118 primitive_types_set = frozenset(("null", "boolean", "int", "long",
119 "float", "double", "string", "record",
122 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
123 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
124 # union type, collect all possible secondaryFiles
125 for i in inputschema:
126 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129 if isinstance(inputschema, basestring):
130 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
136 if "secondaryFiles" in inputschema:
137 # set secondaryFiles, may be inherited by compound types.
138 secondaryspec = inputschema["secondaryFiles"]
140 if isinstance(inputschema["type"], Mapping):
141 # compound type (array or record)
142 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
144 elif (inputschema["type"] == "record" and
145 isinstance(primary, Mapping)):
147 # record type, find secondary files associated with fields.
149 for f in inputschema["fields"]:
150 p = primary.get(shortname(f["name"]))
152 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
154 elif (inputschema["type"] == "array" and
155 isinstance(primary, Sequence)):
157 # array type, find secondary files of elements
160 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
162 elif (inputschema["type"] == "File" and
164 isinstance(primary, Mapping) and
165 primary.get("class") == "File" and
166 "secondaryFiles" not in primary):
168 # Found a file, check for secondaryFiles
170 primary["secondaryFiles"] = []
171 for i, sf in enumerate(aslist(secondaryspec)):
172 pattern = builder.do_eval(sf["pattern"], context=primary)
175 sfpath = substitute(primary["location"], pattern)
176 required = builder.do_eval(sf["required"], context=primary)
178 if fsaccess.exists(sfpath):
179 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
181 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
182 "Required secondary file '%s' does not exist" % sfpath)
184 primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
185 if discovered is not None:
186 discovered[primary["location"]] = primary["secondaryFiles"]
187 elif inputschema["type"] not in primitive_types_set:
188 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
190 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
191 for inputschema in inputs:
192 primary = job_order.get(shortname(inputschema["id"]))
193 if isinstance(primary, (Mapping, Sequence)):
194 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
196 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
197 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
199 def upload_dependencies(arvrunner, name, document_loader,
200 workflowobj, uri, loadref_run,
201 include_primary=True, discovered_secondaryfiles=None):
202 """Upload the dependencies of the workflowobj document to Keep.
204 Returns a pathmapper object mapping local paths to keep references. Also
205 does an in-place update of references in "workflowobj".
207 Use scandeps to find $import, $include, $schemas, run, File and Directory
208 fields that represent external references.
210 If workflowobj has an "id" field, this will reload the document to ensure
211 it is scanning the raw document prior to preprocessing.
216 joined = document_loader.fetcher.urljoin(b, u)
217 defrg, _ = urllib.parse.urldefrag(joined)
218 if defrg not in loaded:
220 # Use fetch_text to get raw file (before preprocessing).
221 text = document_loader.fetch_text(defrg)
222 if isinstance(text, bytes):
223 textIO = StringIO(text.decode('utf-8'))
225 textIO = StringIO(text)
226 return yaml.safe_load(textIO)
231 loadref_fields = set(("$import", "run"))
233 loadref_fields = set(("$import",))
235 scanobj = workflowobj
236 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
237 # Need raw file content (before preprocessing) to ensure
238 # that external references in $include and $mixin are captured.
239 scanobj = loadref("", workflowobj["id"])
241 sc_result = scandeps(uri, scanobj,
243 set(("$include", "$schemas", "location")),
244 loadref, urljoin=document_loader.fetcher.urljoin)
249 def collect_uuids(obj):
250 loc = obj.get("location", "")
253 # Collect collection uuids that need to be resolved to
254 # portable data hashes
255 gp = collection_uuid_pattern.match(loc)
257 uuids[gp.groups()[0]] = obj
258 if collectionUUID in obj:
259 uuids[obj[collectionUUID]] = obj
261 def collect_uploads(obj):
262 loc = obj.get("location", "")
266 if sp[0] in ("file", "http", "https"):
267 # Record local files than need to be uploaded,
268 # don't include file literals, keep references, etc.
272 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
273 visit_class(sc_result, ("File", "Directory"), collect_uploads)
275 # Resolve any collection uuids we found to portable data hashes
276 # and assign them to uuid_map
278 fetch_uuids = list(uuids.keys())
280 # For a large number of fetch_uuids, API server may limit
281 # response size, so keep fetching from API server has nothing
283 lookups = arvrunner.api.collections().list(
284 filters=[["uuid", "in", fetch_uuids]],
286 select=["uuid", "portable_data_hash"]).execute(
287 num_retries=arvrunner.num_retries)
289 if not lookups["items"]:
292 for l in lookups["items"]:
293 uuid_map[l["uuid"]] = l["portable_data_hash"]
295 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
297 normalizeFilesDirs(sc)
299 if include_primary and "id" in workflowobj:
300 sc.append({"class": "File", "location": workflowobj["id"]})
302 if "$schemas" in workflowobj:
303 for s in workflowobj["$schemas"]:
304 sc.append({"class": "File", "location": s})
306 def visit_default(obj):
308 def ensure_default_location(f):
309 if "location" not in f and "path" in f:
310 f["location"] = f["path"]
312 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
313 # Doesn't exist, remove from list of dependencies to upload
314 sc[:] = [x for x in sc if x["location"] != f["location"]]
315 # Delete "default" from workflowobj
317 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
321 find_defaults(workflowobj, visit_default)
324 def discover_default_secondary_files(obj):
325 builder_job_order = {}
326 for t in obj["inputs"]:
327 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
328 # Need to create a builder object to evaluate expressions.
329 builder = make_builder(builder_job_order,
330 obj.get("hints", []),
331 obj.get("requirements", []),
333 discover_secondary_files(arvrunner.fs_access,
339 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
341 for d in list(discovered):
342 # Only interested in discovered secondaryFiles which are local
343 # files that need to be uploaded.
344 if d.startswith("file:"):
345 sc.extend(discovered[d])
349 mapper = ArvPathMapper(arvrunner, sc, "",
353 single_collection=True)
356 loc = p.get("location")
357 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
358 p["location"] = mapper.mapper(p["location"]).resolved
364 if collectionUUID in p:
365 uuid = p[collectionUUID]
366 if uuid not in uuid_map:
367 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
368 "Collection uuid %s not found" % uuid)
369 gp = collection_pdh_pattern.match(loc)
370 if gp and uuid_map[uuid] != gp.groups()[0]:
371 # This file entry has both collectionUUID and a PDH
372 # location. If the PDH doesn't match the one returned
373 # the API server, raise an error.
374 raise SourceLine(p, "location", validate.ValidationException).makeError(
375 "Expected collection uuid %s to be %s but API server reported %s" % (
376 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
378 gp = collection_uuid_pattern.match(loc)
381 uuid = gp.groups()[0]
382 if uuid not in uuid_map:
383 raise SourceLine(p, "location", validate.ValidationException).makeError(
384 "Collection uuid %s not found" % uuid)
385 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
386 p[collectionUUID] = uuid
388 visit_class(workflowobj, ("File", "Directory"), setloc)
389 visit_class(discovered, ("File", "Directory"), setloc)
391 if discovered_secondaryfiles is not None:
393 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
395 if "$schemas" in workflowobj:
397 for s in workflowobj["$schemas"]:
398 sch.append(mapper.mapper(s).resolved)
399 workflowobj["$schemas"] = sch
404 def upload_docker(arvrunner, tool):
405 """Uploads Docker images used in CommandLineTool objects."""
407 if isinstance(tool, CommandLineTool):
408 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
410 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
411 # TODO: can be supported by containers API, but not jobs API.
412 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
413 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
414 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
416 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
417 elif isinstance(tool, cwltool.workflow.Workflow):
419 upload_docker(arvrunner, s.embedded_tool)
422 def packed_workflow(arvrunner, tool, merged_map):
423 """Create a packed workflow.
425 A "packed" workflow is one where all the components have been combined into a single document."""
428 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
429 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
431 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
433 def visit(v, cur_id):
434 if isinstance(v, dict):
435 if v.get("class") in ("CommandLineTool", "Workflow"):
437 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
438 cur_id = rewrite_to_orig.get(v["id"], v["id"])
439 if "location" in v and not v["location"].startswith("keep:"):
440 v["location"] = merged_map[cur_id].resolved[v["location"]]
441 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
442 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
443 if v.get("class") == "DockerRequirement":
444 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
447 if isinstance(v, list):
454 def tag_git_version(packed):
455 if tool.tool["id"].startswith("file://"):
456 path = os.path.dirname(tool.tool["id"][7:])
458 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
459 except (OSError, subprocess.CalledProcessError):
462 packed["http://schema.org/version"] = githash
465 def upload_job_order(arvrunner, name, tool, job_order):
466 """Upload local files referenced in the input object and return updated input
467 object with 'location' updated to the proper keep references.
470 # Make a copy of the job order and set defaults.
471 builder_job_order = copy.copy(job_order)
472 fill_in_defaults(tool.tool["inputs"],
475 # Need to create a builder object to evaluate expressions.
476 builder = make_builder(builder_job_order,
480 # Now update job_order with secondaryFiles
481 discover_secondary_files(arvrunner.fs_access,
486 jobmapper = upload_dependencies(arvrunner,
490 job_order.get("id", "#"),
493 if "id" in job_order:
496 # Need to filter this out, gets added by cwltool when providing
497 # parameters on the command line.
498 if "job_order" in job_order:
499 del job_order["job_order"]
503 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
505 def upload_workflow_deps(arvrunner, tool):
506 # Ensure that Docker images needed by this workflow are available
508 upload_docker(arvrunner, tool)
510 document_loader = tool.doc_loader
514 def upload_tool_deps(deptool):
516 discovered_secondaryfiles = {}
517 pm = upload_dependencies(arvrunner,
518 "%s dependencies" % (shortname(deptool["id"])),
523 include_primary=False,
524 discovered_secondaryfiles=discovered_secondaryfiles)
525 document_loader.idx[deptool["id"]] = deptool
527 for k,v in pm.items():
528 toolmap[k] = v.resolved
529 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
531 tool.visit(upload_tool_deps)
535 def arvados_jobs_image(arvrunner, img):
536 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
539 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
540 except Exception as e:
541 raise Exception("Docker image %s is not available\n%s" % (img, e) )
544 def upload_workflow_collection(arvrunner, name, packed):
545 collection = arvados.collection.Collection(api_client=arvrunner.api,
546 keep_client=arvrunner.keep_client,
547 num_retries=arvrunner.num_retries)
548 with collection.open("workflow.cwl", "w") as f:
549 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
551 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
552 ["name", "like", name+"%"]]
553 if arvrunner.project_uuid:
554 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
555 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
558 logger.info("Using collection %s", exists["items"][0]["uuid"])
560 collection.save_new(name=name,
561 owner_uuid=arvrunner.project_uuid,
562 ensure_unique_name=True,
563 num_retries=arvrunner.num_retries)
564 logger.info("Uploaded to %s", collection.manifest_locator())
566 return collection.portable_data_hash()
569 class Runner(Process):
570 """Base class for runner processes, which submit an instance of
571 arvados-cwl-runner and wait for the final result."""
573 def __init__(self, runner, tool, loadingContext, enable_reuse,
574 output_name, output_tags, submit_runner_ram=0,
575 name=None, on_error=None, submit_runner_image=None,
576 intermediate_output_ttl=0, merged_map=None,
577 priority=None, secret_store=None,
578 collection_cache_size=256,
579 collection_cache_is_default=True):
581 loadingContext = loadingContext.copy()
582 loadingContext.metadata = loadingContext.metadata.copy()
583 loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
585 super(Runner, self).__init__(tool.tool, loadingContext)
587 self.arvrunner = runner
588 self.embedded_tool = tool
589 self.job_order = None
592 # If reuse is permitted by command line arguments but
593 # disabled by the workflow itself, disable it.
594 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
596 enable_reuse = reuse_req["enableReuse"]
597 self.enable_reuse = enable_reuse
599 self.final_output = None
600 self.output_name = output_name
601 self.output_tags = output_tags
603 self.on_error = on_error
604 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
605 self.intermediate_output_ttl = intermediate_output_ttl
606 self.priority = priority
607 self.secret_store = secret_store
608 self.enable_dev = loadingContext.enable_dev
610 self.submit_runner_cores = 1
611 self.submit_runner_ram = 1024 # defaut 1 GiB
612 self.collection_cache_size = collection_cache_size
614 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
615 if runner_resource_req:
616 if runner_resource_req.get("coresMin"):
617 self.submit_runner_cores = runner_resource_req["coresMin"]
618 if runner_resource_req.get("ramMin"):
619 self.submit_runner_ram = runner_resource_req["ramMin"]
620 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
621 self.collection_cache_size = runner_resource_req["keep_cache"]
623 if submit_runner_ram:
624 # Command line / initializer overrides default and/or spec from workflow
625 self.submit_runner_ram = submit_runner_ram
627 if self.submit_runner_ram <= 0:
628 raise Exception("Value of submit-runner-ram must be greater than zero")
630 if self.submit_runner_cores <= 0:
631 raise Exception("Value of submit-runner-cores must be greater than zero")
633 self.merged_map = merged_map or {}
636 job_order, # type: Mapping[Text, Text]
637 output_callbacks, # type: Callable[[Any, Any], Any]
638 runtimeContext # type: RuntimeContext
639 ): # type: (...) -> Generator[Any, None, None]
640 self.job_order = job_order
641 self._init_job(job_order, runtimeContext)
644 def update_pipeline_component(self, record):
647 def done(self, record):
648 """Base method for handling a completed runner."""
651 if record["state"] == "Complete":
652 if record.get("exit_code") is not None:
653 if record["exit_code"] == 33:
654 processStatus = "UnsupportedRequirement"
655 elif record["exit_code"] == 0:
656 processStatus = "success"
658 processStatus = "permanentFail"
660 processStatus = "success"
662 processStatus = "permanentFail"
666 if processStatus == "permanentFail":
667 logc = arvados.collection.CollectionReader(record["log"],
668 api_client=self.arvrunner.api,
669 keep_client=self.arvrunner.keep_client,
670 num_retries=self.arvrunner.num_retries)
671 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
673 self.final_output = record["output"]
674 outc = arvados.collection.CollectionReader(self.final_output,
675 api_client=self.arvrunner.api,
676 keep_client=self.arvrunner.keep_client,
677 num_retries=self.arvrunner.num_retries)
678 if "cwl.output.json" in outc:
679 with outc.open("cwl.output.json", "rb") as f:
681 outputs = json.loads(f.read().decode())
682 def keepify(fileobj):
683 path = fileobj["location"]
684 if not path.startswith("keep:"):
685 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
686 adjustFileObjs(outputs, keepify)
687 adjustDirObjs(outputs, keepify)
689 logger.exception("[%s] While getting final output object", self.name)
690 self.arvrunner.output_callback({}, "permanentFail")
692 self.arvrunner.output_callback(outputs, processStatus)