1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion")
111 def search_schemadef(name, reqs):
113 if r["class"] == "SchemaDefRequirement":
114 for sd in r["types"]:
115 if sd["name"] == name:
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120 "float", "double", "string", "record",
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125 # union type, collect all possible secondaryFiles
126 for i in inputschema:
127 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130 if isinstance(inputschema, basestring):
131 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
137 if "secondaryFiles" in inputschema:
138 # set secondaryFiles, may be inherited by compound types.
139 secondaryspec = inputschema["secondaryFiles"]
141 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142 not isinstance(inputschema["type"], basestring)):
143 # compound type (union, array, record)
144 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146 elif (inputschema["type"] == "record" and
147 isinstance(primary, Mapping)):
149 # record type, find secondary files associated with fields.
151 for f in inputschema["fields"]:
152 p = primary.get(shortname(f["name"]))
154 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156 elif (inputschema["type"] == "array" and
157 isinstance(primary, Sequence)):
159 # array type, find secondary files of elements
162 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164 elif (inputschema["type"] == "File" and
166 isinstance(primary, Mapping) and
167 primary.get("class") == "File" and
168 "secondaryFiles" not in primary):
170 # Found a file, check for secondaryFiles
173 primary["secondaryFiles"] = secondaryspec
174 for i, sf in enumerate(aslist(secondaryspec)):
175 if builder.cwlVersion == "v1.0":
176 pattern = builder.do_eval(sf, context=primary)
178 pattern = builder.do_eval(sf["pattern"], context=primary)
181 if isinstance(pattern, list):
182 specs.extend(pattern)
183 elif isinstance(pattern, dict):
184 specs.append(pattern)
185 elif isinstance(pattern, str):
186 specs.append({"pattern": pattern})
188 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
189 "Expression must return list, object, string or null")
192 for i, sf in enumerate(specs):
193 if isinstance(sf, dict):
194 if sf.get("class") == "File":
195 pattern = sf["basename"]
197 pattern = sf["pattern"]
198 required = sf.get("required")
199 elif isinstance(sf, str):
203 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
204 "Expression must return list, object, string or null")
206 sfpath = substitute(primary["location"], pattern)
207 required = builder.do_eval(required, context=primary)
209 if fsaccess.exists(sfpath):
210 found.append({"location": sfpath, "class": "File"})
212 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213 "Required secondary file '%s' does not exist" % sfpath)
215 primary["secondaryFiles"] = cmap(found)
216 if discovered is not None:
217 discovered[primary["location"]] = primary["secondaryFiles"]
218 elif inputschema["type"] not in primitive_types_set:
219 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
221 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
222 for inputschema in inputs:
223 primary = job_order.get(shortname(inputschema["id"]))
224 if isinstance(primary, (Mapping, Sequence)):
225 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
227 def upload_dependencies(arvrunner, name, document_loader,
228 workflowobj, uri, loadref_run,
229 include_primary=True, discovered_secondaryfiles=None):
230 """Upload the dependencies of the workflowobj document to Keep.
232 Returns a pathmapper object mapping local paths to keep references. Also
233 does an in-place update of references in "workflowobj".
235 Use scandeps to find $import, $include, $schemas, run, File and Directory
236 fields that represent external references.
238 If workflowobj has an "id" field, this will reload the document to ensure
239 it is scanning the raw document prior to preprocessing.
244 joined = document_loader.fetcher.urljoin(b, u)
245 defrg, _ = urllib.parse.urldefrag(joined)
246 if defrg not in loaded:
248 # Use fetch_text to get raw file (before preprocessing).
249 text = document_loader.fetch_text(defrg)
250 if isinstance(text, bytes):
251 textIO = StringIO(text.decode('utf-8'))
253 textIO = StringIO(text)
254 return yaml.safe_load(textIO)
259 loadref_fields = set(("$import", "run"))
261 loadref_fields = set(("$import",))
263 scanobj = workflowobj
264 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
265 # Need raw file content (before preprocessing) to ensure
266 # that external references in $include and $mixin are captured.
267 scanobj = loadref("", workflowobj["id"])
271 sc_result = scandeps(uri, scanobj,
273 set(("$include", "$schemas", "location")),
274 loadref, urljoin=document_loader.fetcher.urljoin)
279 def collect_uuids(obj):
280 loc = obj.get("location", "")
283 # Collect collection uuids that need to be resolved to
284 # portable data hashes
285 gp = collection_uuid_pattern.match(loc)
287 uuids[gp.groups()[0]] = obj
288 if collectionUUID in obj:
289 uuids[obj[collectionUUID]] = obj
291 def collect_uploads(obj):
292 loc = obj.get("location", "")
296 if sp[0] in ("file", "http", "https"):
297 # Record local files than need to be uploaded,
298 # don't include file literals, keep references, etc.
302 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
303 visit_class(sc_result, ("File", "Directory"), collect_uploads)
305 # Resolve any collection uuids we found to portable data hashes
306 # and assign them to uuid_map
308 fetch_uuids = list(uuids.keys())
310 # For a large number of fetch_uuids, API server may limit
311 # response size, so keep fetching from API server has nothing
313 lookups = arvrunner.api.collections().list(
314 filters=[["uuid", "in", fetch_uuids]],
316 select=["uuid", "portable_data_hash"]).execute(
317 num_retries=arvrunner.num_retries)
319 if not lookups["items"]:
322 for l in lookups["items"]:
323 uuid_map[l["uuid"]] = l["portable_data_hash"]
325 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
327 normalizeFilesDirs(sc)
329 if include_primary and "id" in workflowobj:
330 sc.append({"class": "File", "location": workflowobj["id"]})
332 if "$schemas" in workflowobj:
333 for s in workflowobj["$schemas"]:
334 sc.append({"class": "File", "location": s})
336 def visit_default(obj):
338 def ensure_default_location(f):
339 if "location" not in f and "path" in f:
340 f["location"] = f["path"]
342 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
343 # Doesn't exist, remove from list of dependencies to upload
344 sc[:] = [x for x in sc if x["location"] != f["location"]]
345 # Delete "default" from workflowobj
347 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
351 find_defaults(workflowobj, visit_default)
354 def discover_default_secondary_files(obj):
355 builder_job_order = {}
356 for t in obj["inputs"]:
357 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
358 # Need to create a builder object to evaluate expressions.
359 builder = make_builder(builder_job_order,
360 obj.get("hints", []),
361 obj.get("requirements", []),
364 discover_secondary_files(arvrunner.fs_access,
370 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
371 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
373 for d in list(discovered):
374 # Only interested in discovered secondaryFiles which are local
375 # files that need to be uploaded.
376 if d.startswith("file:"):
377 sc.extend(discovered[d])
381 mapper = ArvPathMapper(arvrunner, sc, "",
385 single_collection=True)
388 loc = p.get("location")
389 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
390 p["location"] = mapper.mapper(p["location"]).resolved
396 if collectionUUID in p:
397 uuid = p[collectionUUID]
398 if uuid not in uuid_map:
399 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
400 "Collection uuid %s not found" % uuid)
401 gp = collection_pdh_pattern.match(loc)
402 if gp and uuid_map[uuid] != gp.groups()[0]:
403 # This file entry has both collectionUUID and a PDH
404 # location. If the PDH doesn't match the one returned
405 # the API server, raise an error.
406 raise SourceLine(p, "location", validate.ValidationException).makeError(
407 "Expected collection uuid %s to be %s but API server reported %s" % (
408 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
410 gp = collection_uuid_pattern.match(loc)
413 uuid = gp.groups()[0]
414 if uuid not in uuid_map:
415 raise SourceLine(p, "location", validate.ValidationException).makeError(
416 "Collection uuid %s not found" % uuid)
417 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
418 p[collectionUUID] = uuid
420 visit_class(workflowobj, ("File", "Directory"), setloc)
421 visit_class(discovered, ("File", "Directory"), setloc)
423 if discovered_secondaryfiles is not None:
425 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
427 if "$schemas" in workflowobj:
429 for s in workflowobj["$schemas"]:
431 sch.append(mapper.mapper(s).resolved)
432 workflowobj["$schemas"] = sch
437 def upload_docker(arvrunner, tool):
438 """Uploads Docker images used in CommandLineTool objects."""
440 if isinstance(tool, CommandLineTool):
441 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
443 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
444 # TODO: can be supported by containers API, but not jobs API.
445 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
446 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
447 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
448 arvrunner.runtimeContext.force_docker_pull,
449 arvrunner.runtimeContext.tmp_outdir_prefix)
451 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
452 True, arvrunner.project_uuid,
453 arvrunner.runtimeContext.force_docker_pull,
454 arvrunner.runtimeContext.tmp_outdir_prefix)
455 elif isinstance(tool, cwltool.workflow.Workflow):
457 upload_docker(arvrunner, s.embedded_tool)
460 def packed_workflow(arvrunner, tool, merged_map):
461 """Create a packed workflow.
463 A "packed" workflow is one where all the components have been combined into a single document."""
466 packed = pack(arvrunner.loadingContext, tool.tool["id"],
467 rewrite_out=rewrites,
468 loader=tool.doc_loader)
470 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
472 def visit(v, cur_id):
473 if isinstance(v, dict):
474 if v.get("class") in ("CommandLineTool", "Workflow"):
475 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
476 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
478 cur_id = rewrite_to_orig.get(v["id"], v["id"])
479 if "path" in v and "location" not in v:
480 v["location"] = v["path"]
482 if "location" in v and not v["location"].startswith("keep:"):
483 v["location"] = merged_map[cur_id].resolved[v["location"]]
484 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
485 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
486 if v.get("class") == "DockerRequirement":
487 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
488 arvrunner.project_uuid,
489 arvrunner.runtimeContext.force_docker_pull,
490 arvrunner.runtimeContext.tmp_outdir_prefix)
493 if isinstance(v, list):
500 def tag_git_version(packed):
501 if tool.tool["id"].startswith("file://"):
502 path = os.path.dirname(tool.tool["id"][7:])
504 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
505 except (OSError, subprocess.CalledProcessError):
508 packed["http://schema.org/version"] = githash
511 def upload_job_order(arvrunner, name, tool, job_order):
512 """Upload local files referenced in the input object and return updated input
513 object with 'location' updated to the proper keep references.
516 # Make a copy of the job order and set defaults.
517 builder_job_order = copy.copy(job_order)
519 # fill_in_defaults throws an error if there are any
520 # missing required parameters, we don't want it to do that
521 # so make them all optional.
522 inputs_copy = copy.deepcopy(tool.tool["inputs"])
523 for i in inputs_copy:
524 if "null" not in i["type"]:
525 i["type"] = ["null"] + aslist(i["type"])
527 fill_in_defaults(inputs_copy,
530 # Need to create a builder object to evaluate expressions.
531 builder = make_builder(builder_job_order,
536 # Now update job_order with secondaryFiles
537 discover_secondary_files(arvrunner.fs_access,
542 jobmapper = upload_dependencies(arvrunner,
546 job_order.get("id", "#"),
549 if "id" in job_order:
552 # Need to filter this out, gets added by cwltool when providing
553 # parameters on the command line.
554 if "job_order" in job_order:
555 del job_order["job_order"]
559 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
561 def upload_workflow_deps(arvrunner, tool):
562 # Ensure that Docker images needed by this workflow are available
564 upload_docker(arvrunner, tool)
566 document_loader = tool.doc_loader
570 def upload_tool_deps(deptool):
572 discovered_secondaryfiles = {}
573 pm = upload_dependencies(arvrunner,
574 "%s dependencies" % (shortname(deptool["id"])),
579 include_primary=False,
580 discovered_secondaryfiles=discovered_secondaryfiles)
581 document_loader.idx[deptool["id"]] = deptool
583 for k,v in pm.items():
584 toolmap[k] = v.resolved
585 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
587 tool.visit(upload_tool_deps)
591 def arvados_jobs_image(arvrunner, img):
592 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
595 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
596 arvrunner.runtimeContext.force_docker_pull,
597 arvrunner.runtimeContext.tmp_outdir_prefix)
598 except Exception as e:
599 raise Exception("Docker image %s is not available\n%s" % (img, e) )
602 def upload_workflow_collection(arvrunner, name, packed):
603 collection = arvados.collection.Collection(api_client=arvrunner.api,
604 keep_client=arvrunner.keep_client,
605 num_retries=arvrunner.num_retries)
606 with collection.open("workflow.cwl", "w") as f:
607 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
609 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
610 ["name", "like", name+"%"]]
611 if arvrunner.project_uuid:
612 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
613 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
616 logger.info("Using collection %s", exists["items"][0]["uuid"])
618 collection.save_new(name=name,
619 owner_uuid=arvrunner.project_uuid,
620 ensure_unique_name=True,
621 num_retries=arvrunner.num_retries)
622 logger.info("Uploaded to %s", collection.manifest_locator())
624 return collection.portable_data_hash()
627 class Runner(Process):
628 """Base class for runner processes, which submit an instance of
629 arvados-cwl-runner and wait for the final result."""
631 def __init__(self, runner, updated_tool,
632 tool, loadingContext, enable_reuse,
633 output_name, output_tags, submit_runner_ram=0,
634 name=None, on_error=None, submit_runner_image=None,
635 intermediate_output_ttl=0, merged_map=None,
636 priority=None, secret_store=None,
637 collection_cache_size=256,
638 collection_cache_is_default=True):
640 loadingContext = loadingContext.copy()
641 loadingContext.metadata = updated_tool.metadata.copy()
643 super(Runner, self).__init__(updated_tool.tool, loadingContext)
645 self.arvrunner = runner
646 self.embedded_tool = tool
647 self.job_order = None
650 # If reuse is permitted by command line arguments but
651 # disabled by the workflow itself, disable it.
652 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
654 enable_reuse = reuse_req["enableReuse"]
655 self.enable_reuse = enable_reuse
657 self.final_output = None
658 self.output_name = output_name
659 self.output_tags = output_tags
661 self.on_error = on_error
662 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
663 self.intermediate_output_ttl = intermediate_output_ttl
664 self.priority = priority
665 self.secret_store = secret_store
666 self.enable_dev = loadingContext.enable_dev
668 self.submit_runner_cores = 1
669 self.submit_runner_ram = 1024 # defaut 1 GiB
670 self.collection_cache_size = collection_cache_size
672 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
673 if runner_resource_req:
674 if runner_resource_req.get("coresMin"):
675 self.submit_runner_cores = runner_resource_req["coresMin"]
676 if runner_resource_req.get("ramMin"):
677 self.submit_runner_ram = runner_resource_req["ramMin"]
678 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
679 self.collection_cache_size = runner_resource_req["keep_cache"]
681 if submit_runner_ram:
682 # Command line / initializer overrides default and/or spec from workflow
683 self.submit_runner_ram = submit_runner_ram
685 if self.submit_runner_ram <= 0:
686 raise Exception("Value of submit-runner-ram must be greater than zero")
688 if self.submit_runner_cores <= 0:
689 raise Exception("Value of submit-runner-cores must be greater than zero")
691 self.merged_map = merged_map or {}
694 job_order, # type: Mapping[Text, Text]
695 output_callbacks, # type: Callable[[Any, Any], Any]
696 runtimeContext # type: RuntimeContext
697 ): # type: (...) -> Generator[Any, None, None]
698 self.job_order = job_order
699 self._init_job(job_order, runtimeContext)
702 def update_pipeline_component(self, record):
705 def done(self, record):
706 """Base method for handling a completed runner."""
709 if record["state"] == "Complete":
710 if record.get("exit_code") is not None:
711 if record["exit_code"] == 33:
712 processStatus = "UnsupportedRequirement"
713 elif record["exit_code"] == 0:
714 processStatus = "success"
716 processStatus = "permanentFail"
718 processStatus = "success"
720 processStatus = "permanentFail"
724 if processStatus == "permanentFail":
725 logc = arvados.collection.CollectionReader(record["log"],
726 api_client=self.arvrunner.api,
727 keep_client=self.arvrunner.keep_client,
728 num_retries=self.arvrunner.num_retries)
729 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
731 self.final_output = record["output"]
732 outc = arvados.collection.CollectionReader(self.final_output,
733 api_client=self.arvrunner.api,
734 keep_client=self.arvrunner.keep_client,
735 num_retries=self.arvrunner.num_retries)
736 if "cwl.output.json" in outc:
737 with outc.open("cwl.output.json", "rb") as f:
739 outputs = json.loads(f.read().decode())
740 def keepify(fileobj):
741 path = fileobj["location"]
742 if not path.startswith("keep:"):
743 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
744 adjustFileObjs(outputs, keepify)
745 adjustDirObjs(outputs, keepify)
747 logger.exception("[%s] While getting final output object", self.name)
748 self.arvrunner.output_callback({}, "permanentFail")
750 self.arvrunner.output_callback(outputs, processStatus)