1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
53 logger = logging.getLogger('arvados.cwl-runner')
55 def trim_anonymous_location(obj):
56 """Remove 'location' field from File and Directory literals.
58 To make internal handling easier, literals are assigned a random id for
59 'location'. However, when writing the record back out, this can break
60 reproducibility. Since it is valid for literals not have a 'location'
65 if obj.get("location", "").startswith("_:"):
69 def remove_redundant_fields(obj):
70 for field in ("path", "nameext", "nameroot", "dirname"):
75 def find_defaults(d, op):
76 if isinstance(d, list):
79 elif isinstance(d, dict):
83 for i in viewvalues(d):
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
89 files=[], # type: List[Dict[Text, Text]]
90 bindings=[], # type: List[Dict[Text, Any]]
91 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
92 names=None, # type: Names
93 requirements=requirements, # type: List[Dict[Text, Any]]
94 hints=hints, # type: List[Dict[Text, Any]]
95 resources={}, # type: Dict[str, int]
96 mutation_manager=None, # type: Optional[MutationManager]
97 formatgraph=None, # type: Optional[Graph]
98 make_fs_access=None, # type: Type[StdFsAccess]
99 fs_access=None, # type: StdFsAccess
100 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101 timeout=runtimeContext.eval_timeout, # type: float
102 debug=runtimeContext.debug, # type: bool
103 js_console=runtimeContext.js_console, # type: bool
104 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
105 loadListing="", # type: Text
106 outdir="", # type: Text
107 tmpdir="", # type: Text
108 stagedir="", # type: Text
109 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion")
112 def search_schemadef(name, reqs):
114 if r["class"] == "SchemaDefRequirement":
115 for sd in r["types"]:
116 if sd["name"] == name:
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121 "float", "double", "string", "record",
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126 # union type, collect all possible secondaryFiles
127 for i in inputschema:
128 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
131 if isinstance(inputschema, basestring):
132 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
138 if "secondaryFiles" in inputschema:
139 # set secondaryFiles, may be inherited by compound types.
140 secondaryspec = inputschema["secondaryFiles"]
142 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143 not isinstance(inputschema["type"], basestring)):
144 # compound type (union, array, record)
145 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
147 elif (inputschema["type"] == "record" and
148 isinstance(primary, Mapping)):
150 # record type, find secondary files associated with fields.
152 for f in inputschema["fields"]:
153 p = primary.get(shortname(f["name"]))
155 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
157 elif (inputschema["type"] == "array" and
158 isinstance(primary, Sequence)):
160 # array type, find secondary files of elements
163 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
165 elif (inputschema["type"] == "File" and
167 isinstance(primary, Mapping) and
168 primary.get("class") == "File" and
169 "secondaryFiles" not in primary):
171 # Found a file, check for secondaryFiles
174 primary["secondaryFiles"] = secondaryspec
175 for i, sf in enumerate(aslist(secondaryspec)):
176 if builder.cwlVersion == "v1.0":
177 pattern = builder.do_eval(sf, context=primary)
179 pattern = builder.do_eval(sf["pattern"], context=primary)
182 if isinstance(pattern, list):
183 specs.extend(pattern)
184 elif isinstance(pattern, dict):
185 specs.append(pattern)
186 elif isinstance(pattern, str):
187 specs.append({"pattern": pattern})
189 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
190 "Expression must return list, object, string or null")
193 for i, sf in enumerate(specs):
194 if isinstance(sf, dict):
195 if sf.get("class") == "File":
196 pattern = sf["basename"]
198 pattern = sf["pattern"]
199 required = sf.get("required")
200 elif isinstance(sf, str):
204 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
205 "Expression must return list, object, string or null")
207 sfpath = substitute(primary["location"], pattern)
208 required = builder.do_eval(required, context=primary)
210 if fsaccess.exists(sfpath):
211 found.append({"location": sfpath, "class": "File"})
213 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214 "Required secondary file '%s' does not exist" % sfpath)
216 primary["secondaryFiles"] = cmap(found)
217 if discovered is not None:
218 discovered[primary["location"]] = primary["secondaryFiles"]
219 elif inputschema["type"] not in primitive_types_set:
220 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
222 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
223 for inputschema in inputs:
224 primary = job_order.get(shortname(inputschema["id"]))
225 if isinstance(primary, (Mapping, Sequence)):
226 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
228 def upload_dependencies(arvrunner, name, document_loader,
229 workflowobj, uri, loadref_run,
230 include_primary=True, discovered_secondaryfiles=None):
231 """Upload the dependencies of the workflowobj document to Keep.
233 Returns a pathmapper object mapping local paths to keep references. Also
234 does an in-place update of references in "workflowobj".
236 Use scandeps to find $import, $include, $schemas, run, File and Directory
237 fields that represent external references.
239 If workflowobj has an "id" field, this will reload the document to ensure
240 it is scanning the raw document prior to preprocessing.
245 joined = document_loader.fetcher.urljoin(b, u)
246 defrg, _ = urllib.parse.urldefrag(joined)
247 if defrg not in loaded:
249 # Use fetch_text to get raw file (before preprocessing).
250 text = document_loader.fetch_text(defrg)
251 if isinstance(text, bytes):
252 textIO = StringIO(text.decode('utf-8'))
254 textIO = StringIO(text)
255 return yaml.safe_load(textIO)
260 loadref_fields = set(("$import", "run"))
262 loadref_fields = set(("$import",))
264 scanobj = workflowobj
265 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
266 # Need raw file content (before preprocessing) to ensure
267 # that external references in $include and $mixin are captured.
268 scanobj = loadref("", workflowobj["id"])
272 sc_result = scandeps(uri, scanobj,
274 set(("$include", "$schemas", "location")),
275 loadref, urljoin=document_loader.fetcher.urljoin)
280 def collect_uuids(obj):
281 loc = obj.get("location", "")
284 # Collect collection uuids that need to be resolved to
285 # portable data hashes
286 gp = collection_uuid_pattern.match(loc)
288 uuids[gp.groups()[0]] = obj
289 if collectionUUID in obj:
290 uuids[obj[collectionUUID]] = obj
292 def collect_uploads(obj):
293 loc = obj.get("location", "")
297 if sp[0] in ("file", "http", "https"):
298 # Record local files than need to be uploaded,
299 # don't include file literals, keep references, etc.
303 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
304 visit_class(sc_result, ("File", "Directory"), collect_uploads)
306 # Resolve any collection uuids we found to portable data hashes
307 # and assign them to uuid_map
309 fetch_uuids = list(uuids.keys())
311 # For a large number of fetch_uuids, API server may limit
312 # response size, so keep fetching from API server has nothing
314 lookups = arvrunner.api.collections().list(
315 filters=[["uuid", "in", fetch_uuids]],
317 select=["uuid", "portable_data_hash"]).execute(
318 num_retries=arvrunner.num_retries)
320 if not lookups["items"]:
323 for l in lookups["items"]:
324 uuid_map[l["uuid"]] = l["portable_data_hash"]
326 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
328 normalizeFilesDirs(sc)
330 if include_primary and "id" in workflowobj:
331 sc.append({"class": "File", "location": workflowobj["id"]})
333 if "$schemas" in workflowobj:
334 for s in workflowobj["$schemas"]:
335 sc.append({"class": "File", "location": s})
337 def visit_default(obj):
339 def ensure_default_location(f):
340 if "location" not in f and "path" in f:
341 f["location"] = f["path"]
343 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
344 # Doesn't exist, remove from list of dependencies to upload
345 sc[:] = [x for x in sc if x["location"] != f["location"]]
346 # Delete "default" from workflowobj
348 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
352 find_defaults(workflowobj, visit_default)
355 def discover_default_secondary_files(obj):
356 builder_job_order = {}
357 for t in obj["inputs"]:
358 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
359 # Need to create a builder object to evaluate expressions.
360 builder = make_builder(builder_job_order,
361 obj.get("hints", []),
362 obj.get("requirements", []),
365 discover_secondary_files(arvrunner.fs_access,
371 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
372 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
374 for d in list(discovered):
375 # Only interested in discovered secondaryFiles which are local
376 # files that need to be uploaded.
377 if d.startswith("file:"):
378 sc.extend(discovered[d])
382 mapper = ArvPathMapper(arvrunner, sc, "",
386 single_collection=True)
389 loc = p.get("location")
390 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
391 p["location"] = mapper.mapper(p["location"]).resolved
397 if collectionUUID in p:
398 uuid = p[collectionUUID]
399 if uuid not in uuid_map:
400 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
401 "Collection uuid %s not found" % uuid)
402 gp = collection_pdh_pattern.match(loc)
403 if gp and uuid_map[uuid] != gp.groups()[0]:
404 # This file entry has both collectionUUID and a PDH
405 # location. If the PDH doesn't match the one returned
406 # the API server, raise an error.
407 raise SourceLine(p, "location", validate.ValidationException).makeError(
408 "Expected collection uuid %s to be %s but API server reported %s" % (
409 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
411 gp = collection_uuid_pattern.match(loc)
414 uuid = gp.groups()[0]
415 if uuid not in uuid_map:
416 raise SourceLine(p, "location", validate.ValidationException).makeError(
417 "Collection uuid %s not found" % uuid)
418 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
419 p[collectionUUID] = uuid
421 visit_class(workflowobj, ("File", "Directory"), setloc)
422 visit_class(discovered, ("File", "Directory"), setloc)
424 if discovered_secondaryfiles is not None:
426 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
428 if "$schemas" in workflowobj:
430 for s in workflowobj["$schemas"]:
431 sch.append(mapper.mapper(s).resolved)
432 workflowobj["$schemas"] = sch
437 def upload_docker(arvrunner, tool):
438 """Uploads Docker images used in CommandLineTool objects."""
440 if isinstance(tool, CommandLineTool):
441 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
443 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
444 # TODO: can be supported by containers API, but not jobs API.
445 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
446 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
447 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
449 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
450 elif isinstance(tool, cwltool.workflow.Workflow):
452 upload_docker(arvrunner, s.embedded_tool)
455 def packed_workflow(arvrunner, tool, merged_map):
456 """Create a packed workflow.
458 A "packed" workflow is one where all the components have been combined into a single document."""
461 packed = pack(arvrunner.loadingContext, tool.tool["id"],
462 rewrite_out=rewrites,
463 loader=tool.doc_loader)
465 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
467 def visit(v, cur_id):
468 if isinstance(v, dict):
469 if v.get("class") in ("CommandLineTool", "Workflow"):
470 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
471 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
473 cur_id = rewrite_to_orig.get(v["id"], v["id"])
474 if "path" in v and "location" not in v:
475 v["location"] = v["path"]
477 if "location" in v and not v["location"].startswith("keep:"):
478 v["location"] = merged_map[cur_id].resolved[v["location"]]
479 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
480 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
481 if v.get("class") == "DockerRequirement":
482 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
485 if isinstance(v, list):
492 def tag_git_version(packed):
493 if tool.tool["id"].startswith("file://"):
494 path = os.path.dirname(tool.tool["id"][7:])
496 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
497 except (OSError, subprocess.CalledProcessError):
500 packed["http://schema.org/version"] = githash
503 def upload_job_order(arvrunner, name, tool, job_order):
504 """Upload local files referenced in the input object and return updated input
505 object with 'location' updated to the proper keep references.
508 # Make a copy of the job order and set defaults.
509 builder_job_order = copy.copy(job_order)
511 # fill_in_defaults throws an error if there are any
512 # missing required parameters, we don't want it to do that
513 # so make them all optional.
514 inputs_copy = copy.deepcopy(tool.tool["inputs"])
515 for i in inputs_copy:
516 if "null" not in i["type"]:
517 i["type"] = ["null"] + aslist(i["type"])
519 fill_in_defaults(inputs_copy,
522 # Need to create a builder object to evaluate expressions.
523 builder = make_builder(builder_job_order,
528 # Now update job_order with secondaryFiles
529 discover_secondary_files(arvrunner.fs_access,
534 jobmapper = upload_dependencies(arvrunner,
538 job_order.get("id", "#"),
541 if "id" in job_order:
544 # Need to filter this out, gets added by cwltool when providing
545 # parameters on the command line.
546 if "job_order" in job_order:
547 del job_order["job_order"]
551 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
553 def upload_workflow_deps(arvrunner, tool):
554 # Ensure that Docker images needed by this workflow are available
556 upload_docker(arvrunner, tool)
558 document_loader = tool.doc_loader
562 def upload_tool_deps(deptool):
564 discovered_secondaryfiles = {}
565 pm = upload_dependencies(arvrunner,
566 "%s dependencies" % (shortname(deptool["id"])),
571 include_primary=False,
572 discovered_secondaryfiles=discovered_secondaryfiles)
573 document_loader.idx[deptool["id"]] = deptool
575 for k,v in pm.items():
576 toolmap[k] = v.resolved
577 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
579 tool.visit(upload_tool_deps)
583 def arvados_jobs_image(arvrunner, img):
584 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
587 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
588 except Exception as e:
589 raise Exception("Docker image %s is not available\n%s" % (img, e) )
592 def upload_workflow_collection(arvrunner, name, packed):
593 collection = arvados.collection.Collection(api_client=arvrunner.api,
594 keep_client=arvrunner.keep_client,
595 num_retries=arvrunner.num_retries)
596 with collection.open("workflow.cwl", "w") as f:
597 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
599 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
600 ["name", "like", name+"%"]]
601 if arvrunner.project_uuid:
602 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
603 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
606 logger.info("Using collection %s", exists["items"][0]["uuid"])
608 collection.save_new(name=name,
609 owner_uuid=arvrunner.project_uuid,
610 ensure_unique_name=True,
611 num_retries=arvrunner.num_retries)
612 logger.info("Uploaded to %s", collection.manifest_locator())
614 return collection.portable_data_hash()
617 class Runner(Process):
618 """Base class for runner processes, which submit an instance of
619 arvados-cwl-runner and wait for the final result."""
621 def __init__(self, runner, updated_tool,
622 tool, loadingContext, enable_reuse,
623 output_name, output_tags, submit_runner_ram=0,
624 name=None, on_error=None, submit_runner_image=None,
625 intermediate_output_ttl=0, merged_map=None,
626 priority=None, secret_store=None,
627 collection_cache_size=256,
628 collection_cache_is_default=True):
630 loadingContext = loadingContext.copy()
631 loadingContext.metadata = updated_tool.metadata.copy()
633 super(Runner, self).__init__(updated_tool.tool, loadingContext)
635 self.arvrunner = runner
636 self.embedded_tool = tool
637 self.job_order = None
640 # If reuse is permitted by command line arguments but
641 # disabled by the workflow itself, disable it.
642 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
644 enable_reuse = reuse_req["enableReuse"]
645 self.enable_reuse = enable_reuse
647 self.final_output = None
648 self.output_name = output_name
649 self.output_tags = output_tags
651 self.on_error = on_error
652 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
653 self.intermediate_output_ttl = intermediate_output_ttl
654 self.priority = priority
655 self.secret_store = secret_store
656 self.enable_dev = loadingContext.enable_dev
658 self.submit_runner_cores = 1
659 self.submit_runner_ram = 1024 # defaut 1 GiB
660 self.collection_cache_size = collection_cache_size
662 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
663 if runner_resource_req:
664 if runner_resource_req.get("coresMin"):
665 self.submit_runner_cores = runner_resource_req["coresMin"]
666 if runner_resource_req.get("ramMin"):
667 self.submit_runner_ram = runner_resource_req["ramMin"]
668 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
669 self.collection_cache_size = runner_resource_req["keep_cache"]
671 if submit_runner_ram:
672 # Command line / initializer overrides default and/or spec from workflow
673 self.submit_runner_ram = submit_runner_ram
675 if self.submit_runner_ram <= 0:
676 raise Exception("Value of submit-runner-ram must be greater than zero")
678 if self.submit_runner_cores <= 0:
679 raise Exception("Value of submit-runner-cores must be greater than zero")
681 self.merged_map = merged_map or {}
684 job_order, # type: Mapping[Text, Text]
685 output_callbacks, # type: Callable[[Any, Any], Any]
686 runtimeContext # type: RuntimeContext
687 ): # type: (...) -> Generator[Any, None, None]
688 self.job_order = job_order
689 self._init_job(job_order, runtimeContext)
692 def update_pipeline_component(self, record):
695 def done(self, record):
696 """Base method for handling a completed runner."""
699 if record["state"] == "Complete":
700 if record.get("exit_code") is not None:
701 if record["exit_code"] == 33:
702 processStatus = "UnsupportedRequirement"
703 elif record["exit_code"] == 0:
704 processStatus = "success"
706 processStatus = "permanentFail"
708 processStatus = "success"
710 processStatus = "permanentFail"
714 if processStatus == "permanentFail":
715 logc = arvados.collection.CollectionReader(record["log"],
716 api_client=self.arvrunner.api,
717 keep_client=self.arvrunner.keep_client,
718 num_retries=self.arvrunner.num_retries)
719 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
721 self.final_output = record["output"]
722 outc = arvados.collection.CollectionReader(self.final_output,
723 api_client=self.arvrunner.api,
724 keep_client=self.arvrunner.keep_client,
725 num_retries=self.arvrunner.num_retries)
726 if "cwl.output.json" in outc:
727 with outc.open("cwl.output.json", "rb") as f:
729 outputs = json.loads(f.read().decode())
730 def keepify(fileobj):
731 path = fileobj["location"]
732 if not path.startswith("keep:"):
733 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
734 adjustFileObjs(outputs, keepify)
735 adjustDirObjs(outputs, keepify)
737 logger.exception("[%s] While getting final output object", self.name)
738 self.arvrunner.output_callback({}, "permanentFail")
740 self.arvrunner.output_callback(outputs, processStatus)