1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
53 logger = logging.getLogger('arvados.cwl-runner')
55 def trim_anonymous_location(obj):
56 """Remove 'location' field from File and Directory literals.
58 To make internal handling easier, literals are assigned a random id for
59 'location'. However, when writing the record back out, this can break
60 reproducibility. Since it is valid for literals not have a 'location'
65 if obj.get("location", "").startswith("_:"):
69 def remove_redundant_fields(obj):
70 for field in ("path", "nameext", "nameroot", "dirname"):
75 def find_defaults(d, op):
76 if isinstance(d, list):
79 elif isinstance(d, dict):
83 for i in viewvalues(d):
86 def make_builder(joborder, hints, requirements, runtimeContext):
89 files=[], # type: List[Dict[Text, Text]]
90 bindings=[], # type: List[Dict[Text, Any]]
91 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
92 names=None, # type: Names
93 requirements=requirements, # type: List[Dict[Text, Any]]
94 hints=hints, # type: List[Dict[Text, Any]]
95 resources={}, # type: Dict[str, int]
96 mutation_manager=None, # type: Optional[MutationManager]
97 formatgraph=None, # type: Optional[Graph]
98 make_fs_access=None, # type: Type[StdFsAccess]
99 fs_access=None, # type: StdFsAccess
100 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101 timeout=runtimeContext.eval_timeout, # type: float
102 debug=runtimeContext.debug, # type: bool
103 js_console=runtimeContext.js_console, # type: bool
104 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
105 loadListing="", # type: Text
106 outdir="", # type: Text
107 tmpdir="", # type: Text
108 stagedir="", # type: Text
111 def search_schemadef(name, reqs):
113 if r["class"] == "SchemaDefRequirement":
114 for sd in r["types"]:
115 if sd["name"] == name:
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120 "float", "double", "string", "record",
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125 # union type, collect all possible secondaryFiles
126 for i in inputschema:
127 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130 if isinstance(inputschema, basestring):
131 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
137 if "secondaryFiles" in inputschema:
138 # set secondaryFiles, may be inherited by compound types.
139 secondaryspec = inputschema["secondaryFiles"]
141 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142 not isinstance(inputschema["type"], basestring)):
143 # compound type (union, array, record)
144 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146 elif (inputschema["type"] == "record" and
147 isinstance(primary, Mapping)):
149 # record type, find secondary files associated with fields.
151 for f in inputschema["fields"]:
152 p = primary.get(shortname(f["name"]))
154 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156 elif (inputschema["type"] == "array" and
157 isinstance(primary, Sequence)):
159 # array type, find secondary files of elements
162 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164 elif (inputschema["type"] == "File" and
166 isinstance(primary, Mapping) and
167 primary.get("class") == "File" and
168 "secondaryFiles" not in primary):
170 # Found a file, check for secondaryFiles
172 primary["secondaryFiles"] = []
173 for i, sf in enumerate(aslist(secondaryspec)):
174 pattern = builder.do_eval(sf["pattern"], context=primary)
177 sfpath = substitute(primary["location"], pattern)
178 required = builder.do_eval(sf.get("required"), context=primary)
180 if fsaccess.exists(sfpath):
181 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
183 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
184 "Required secondary file '%s' does not exist" % sfpath)
186 primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
187 if discovered is not None:
188 discovered[primary["location"]] = primary["secondaryFiles"]
189 elif inputschema["type"] not in primitive_types_set:
190 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
192 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
193 for inputschema in inputs:
194 primary = job_order.get(shortname(inputschema["id"]))
195 if isinstance(primary, (Mapping, Sequence)):
196 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
198 def upload_dependencies(arvrunner, name, document_loader,
199 workflowobj, uri, loadref_run,
200 include_primary=True, discovered_secondaryfiles=None):
201 """Upload the dependencies of the workflowobj document to Keep.
203 Returns a pathmapper object mapping local paths to keep references. Also
204 does an in-place update of references in "workflowobj".
206 Use scandeps to find $import, $include, $schemas, run, File and Directory
207 fields that represent external references.
209 If workflowobj has an "id" field, this will reload the document to ensure
210 it is scanning the raw document prior to preprocessing.
215 joined = document_loader.fetcher.urljoin(b, u)
216 defrg, _ = urllib.parse.urldefrag(joined)
217 if defrg not in loaded:
219 # Use fetch_text to get raw file (before preprocessing).
220 text = document_loader.fetch_text(defrg)
221 if isinstance(text, bytes):
222 textIO = StringIO(text.decode('utf-8'))
224 textIO = StringIO(text)
225 return yaml.safe_load(textIO)
230 loadref_fields = set(("$import", "run"))
232 loadref_fields = set(("$import",))
234 scanobj = workflowobj
235 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
236 # Need raw file content (before preprocessing) to ensure
237 # that external references in $include and $mixin are captured.
238 scanobj = loadref("", workflowobj["id"])
240 sc_result = scandeps(uri, scanobj,
242 set(("$include", "$schemas", "location")),
243 loadref, urljoin=document_loader.fetcher.urljoin)
248 def collect_uuids(obj):
249 loc = obj.get("location", "")
252 # Collect collection uuids that need to be resolved to
253 # portable data hashes
254 gp = collection_uuid_pattern.match(loc)
256 uuids[gp.groups()[0]] = obj
257 if collectionUUID in obj:
258 uuids[obj[collectionUUID]] = obj
260 def collect_uploads(obj):
261 loc = obj.get("location", "")
265 if sp[0] in ("file", "http", "https"):
266 # Record local files than need to be uploaded,
267 # don't include file literals, keep references, etc.
271 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
272 visit_class(sc_result, ("File", "Directory"), collect_uploads)
274 # Resolve any collection uuids we found to portable data hashes
275 # and assign them to uuid_map
277 fetch_uuids = list(uuids.keys())
279 # For a large number of fetch_uuids, API server may limit
280 # response size, so keep fetching from API server has nothing
282 lookups = arvrunner.api.collections().list(
283 filters=[["uuid", "in", fetch_uuids]],
285 select=["uuid", "portable_data_hash"]).execute(
286 num_retries=arvrunner.num_retries)
288 if not lookups["items"]:
291 for l in lookups["items"]:
292 uuid_map[l["uuid"]] = l["portable_data_hash"]
294 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
296 normalizeFilesDirs(sc)
298 if include_primary and "id" in workflowobj:
299 sc.append({"class": "File", "location": workflowobj["id"]})
301 if "$schemas" in workflowobj:
302 for s in workflowobj["$schemas"]:
303 sc.append({"class": "File", "location": s})
305 def visit_default(obj):
307 def ensure_default_location(f):
308 if "location" not in f and "path" in f:
309 f["location"] = f["path"]
311 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
312 # Doesn't exist, remove from list of dependencies to upload
313 sc[:] = [x for x in sc if x["location"] != f["location"]]
314 # Delete "default" from workflowobj
316 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
320 find_defaults(workflowobj, visit_default)
323 def discover_default_secondary_files(obj):
324 builder_job_order = {}
325 for t in obj["inputs"]:
326 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
327 # Need to create a builder object to evaluate expressions.
328 builder = make_builder(builder_job_order,
329 obj.get("hints", []),
330 obj.get("requirements", []),
332 discover_secondary_files(arvrunner.fs_access,
338 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
339 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
341 for d in list(discovered):
342 # Only interested in discovered secondaryFiles which are local
343 # files that need to be uploaded.
344 if d.startswith("file:"):
345 sc.extend(discovered[d])
349 mapper = ArvPathMapper(arvrunner, sc, "",
353 single_collection=True)
356 loc = p.get("location")
357 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
358 p["location"] = mapper.mapper(p["location"]).resolved
364 if collectionUUID in p:
365 uuid = p[collectionUUID]
366 if uuid not in uuid_map:
367 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
368 "Collection uuid %s not found" % uuid)
369 gp = collection_pdh_pattern.match(loc)
370 if gp and uuid_map[uuid] != gp.groups()[0]:
371 # This file entry has both collectionUUID and a PDH
372 # location. If the PDH doesn't match the one returned
373 # the API server, raise an error.
374 raise SourceLine(p, "location", validate.ValidationException).makeError(
375 "Expected collection uuid %s to be %s but API server reported %s" % (
376 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
378 gp = collection_uuid_pattern.match(loc)
381 uuid = gp.groups()[0]
382 if uuid not in uuid_map:
383 raise SourceLine(p, "location", validate.ValidationException).makeError(
384 "Collection uuid %s not found" % uuid)
385 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
386 p[collectionUUID] = uuid
388 visit_class(workflowobj, ("File", "Directory"), setloc)
389 visit_class(discovered, ("File", "Directory"), setloc)
391 if discovered_secondaryfiles is not None:
393 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
395 if "$schemas" in workflowobj:
397 for s in workflowobj["$schemas"]:
398 sch.append(mapper.mapper(s).resolved)
399 workflowobj["$schemas"] = sch
404 def upload_docker(arvrunner, tool):
405 """Uploads Docker images used in CommandLineTool objects."""
407 if isinstance(tool, CommandLineTool):
408 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
410 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
411 # TODO: can be supported by containers API, but not jobs API.
412 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
413 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
414 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
416 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
417 elif isinstance(tool, cwltool.workflow.Workflow):
419 upload_docker(arvrunner, s.embedded_tool)
422 def packed_workflow(arvrunner, tool, merged_map):
423 """Create a packed workflow.
425 A "packed" workflow is one where all the components have been combined into a single document."""
428 packed = pack(arvrunner.loadingContext, tool.tool["id"],
429 rewrite_out=rewrites,
430 loader=tool.doc_loader)
432 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
434 def visit(v, cur_id):
435 if isinstance(v, dict):
436 if v.get("class") in ("CommandLineTool", "Workflow"):
438 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
439 cur_id = rewrite_to_orig.get(v["id"], v["id"])
440 if "path" in v and "location" not in v:
441 v["location"] = v["path"]
443 if "location" in v and not v["location"].startswith("keep:"):
444 v["location"] = merged_map[cur_id].resolved[v["location"]]
445 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
446 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
447 if v.get("class") == "DockerRequirement":
448 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
451 if isinstance(v, list):
458 def tag_git_version(packed):
459 if tool.tool["id"].startswith("file://"):
460 path = os.path.dirname(tool.tool["id"][7:])
462 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
463 except (OSError, subprocess.CalledProcessError):
466 packed["http://schema.org/version"] = githash
469 def upload_job_order(arvrunner, name, tool, job_order):
470 """Upload local files referenced in the input object and return updated input
471 object with 'location' updated to the proper keep references.
474 # Make a copy of the job order and set defaults.
475 builder_job_order = copy.copy(job_order)
477 # fill_in_defaults throws an error if there are any
478 # missing required parameters, we don't want it to do that
479 # so make them all optional.
480 inputs_copy = copy.deepcopy(tool.tool["inputs"])
481 for i in inputs_copy:
482 if "null" not in i["type"]:
483 i["type"] = ["null"] + aslist(i["type"])
485 fill_in_defaults(inputs_copy,
488 # Need to create a builder object to evaluate expressions.
489 builder = make_builder(builder_job_order,
493 # Now update job_order with secondaryFiles
494 discover_secondary_files(arvrunner.fs_access,
499 jobmapper = upload_dependencies(arvrunner,
503 job_order.get("id", "#"),
506 if "id" in job_order:
509 # Need to filter this out, gets added by cwltool when providing
510 # parameters on the command line.
511 if "job_order" in job_order:
512 del job_order["job_order"]
516 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
518 def upload_workflow_deps(arvrunner, tool):
519 # Ensure that Docker images needed by this workflow are available
521 upload_docker(arvrunner, tool)
523 document_loader = tool.doc_loader
527 def upload_tool_deps(deptool):
529 discovered_secondaryfiles = {}
530 pm = upload_dependencies(arvrunner,
531 "%s dependencies" % (shortname(deptool["id"])),
536 include_primary=False,
537 discovered_secondaryfiles=discovered_secondaryfiles)
538 document_loader.idx[deptool["id"]] = deptool
540 for k,v in pm.items():
541 toolmap[k] = v.resolved
542 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
544 tool.visit(upload_tool_deps)
548 def arvados_jobs_image(arvrunner, img):
549 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
552 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
553 except Exception as e:
554 raise Exception("Docker image %s is not available\n%s" % (img, e) )
557 def upload_workflow_collection(arvrunner, name, packed):
558 collection = arvados.collection.Collection(api_client=arvrunner.api,
559 keep_client=arvrunner.keep_client,
560 num_retries=arvrunner.num_retries)
561 with collection.open("workflow.cwl", "w") as f:
562 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
564 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
565 ["name", "like", name+"%"]]
566 if arvrunner.project_uuid:
567 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
568 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
571 logger.info("Using collection %s", exists["items"][0]["uuid"])
573 collection.save_new(name=name,
574 owner_uuid=arvrunner.project_uuid,
575 ensure_unique_name=True,
576 num_retries=arvrunner.num_retries)
577 logger.info("Uploaded to %s", collection.manifest_locator())
579 return collection.portable_data_hash()
582 class Runner(Process):
583 """Base class for runner processes, which submit an instance of
584 arvados-cwl-runner and wait for the final result."""
586 def __init__(self, runner, updated_tool,
587 tool, loadingContext, enable_reuse,
588 output_name, output_tags, submit_runner_ram=0,
589 name=None, on_error=None, submit_runner_image=None,
590 intermediate_output_ttl=0, merged_map=None,
591 priority=None, secret_store=None,
592 collection_cache_size=256,
593 collection_cache_is_default=True):
595 loadingContext = loadingContext.copy()
596 loadingContext.metadata = updated_tool.metadata.copy()
598 super(Runner, self).__init__(updated_tool.tool, loadingContext)
600 self.arvrunner = runner
601 self.embedded_tool = tool
602 self.job_order = None
605 # If reuse is permitted by command line arguments but
606 # disabled by the workflow itself, disable it.
607 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
609 enable_reuse = reuse_req["enableReuse"]
610 self.enable_reuse = enable_reuse
612 self.final_output = None
613 self.output_name = output_name
614 self.output_tags = output_tags
616 self.on_error = on_error
617 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
618 self.intermediate_output_ttl = intermediate_output_ttl
619 self.priority = priority
620 self.secret_store = secret_store
621 self.enable_dev = loadingContext.enable_dev
623 self.submit_runner_cores = 1
624 self.submit_runner_ram = 1024 # defaut 1 GiB
625 self.collection_cache_size = collection_cache_size
627 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
628 if runner_resource_req:
629 if runner_resource_req.get("coresMin"):
630 self.submit_runner_cores = runner_resource_req["coresMin"]
631 if runner_resource_req.get("ramMin"):
632 self.submit_runner_ram = runner_resource_req["ramMin"]
633 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
634 self.collection_cache_size = runner_resource_req["keep_cache"]
636 if submit_runner_ram:
637 # Command line / initializer overrides default and/or spec from workflow
638 self.submit_runner_ram = submit_runner_ram
640 if self.submit_runner_ram <= 0:
641 raise Exception("Value of submit-runner-ram must be greater than zero")
643 if self.submit_runner_cores <= 0:
644 raise Exception("Value of submit-runner-cores must be greater than zero")
646 self.merged_map = merged_map or {}
649 job_order, # type: Mapping[Text, Text]
650 output_callbacks, # type: Callable[[Any, Any], Any]
651 runtimeContext # type: RuntimeContext
652 ): # type: (...) -> Generator[Any, None, None]
653 self.job_order = job_order
654 self._init_job(job_order, runtimeContext)
657 def update_pipeline_component(self, record):
660 def done(self, record):
661 """Base method for handling a completed runner."""
664 if record["state"] == "Complete":
665 if record.get("exit_code") is not None:
666 if record["exit_code"] == 33:
667 processStatus = "UnsupportedRequirement"
668 elif record["exit_code"] == 0:
669 processStatus = "success"
671 processStatus = "permanentFail"
673 processStatus = "success"
675 processStatus = "permanentFail"
679 if processStatus == "permanentFail":
680 logc = arvados.collection.CollectionReader(record["log"],
681 api_client=self.arvrunner.api,
682 keep_client=self.arvrunner.keep_client,
683 num_retries=self.arvrunner.num_retries)
684 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
686 self.final_output = record["output"]
687 outc = arvados.collection.CollectionReader(self.final_output,
688 api_client=self.arvrunner.api,
689 keep_client=self.arvrunner.keep_client,
690 num_retries=self.arvrunner.num_retries)
691 if "cwl.output.json" in outc:
692 with outc.open("cwl.output.json", "rb") as f:
694 outputs = json.loads(f.read().decode())
695 def keepify(fileobj):
696 path = fileobj["location"]
697 if not path.startswith("keep:"):
698 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
699 adjustFileObjs(outputs, keepify)
700 adjustDirObjs(outputs, keepify)
702 logger.exception("[%s] While getting final output object", self.name)
703 self.arvrunner.output_callback({}, "permanentFail")
705 self.arvrunner.output_callback(outputs, processStatus)