1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
110 def search_schemadef(name, reqs):
112 if r["class"] == "SchemaDefRequirement":
113 for sd in r["types"]:
114 if sd["name"] == name:
118 primitive_types_set = frozenset(("null", "boolean", "int", "long",
119 "float", "double", "string", "record",
122 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
123 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
124 # union type, collect all possible secondaryFiles
125 for i in inputschema:
126 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129 if isinstance(inputschema, basestring):
130 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
136 if "secondaryFiles" in inputschema:
137 # set secondaryFiles, may be inherited by compound types.
138 secondaryspec = inputschema["secondaryFiles"]
140 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
141 not isinstance(inputschema["type"], basestring)):
142 # compound type (union, array, record)
143 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
145 elif (inputschema["type"] == "record" and
146 isinstance(primary, Mapping)):
148 # record type, find secondary files associated with fields.
150 for f in inputschema["fields"]:
151 p = primary.get(shortname(f["name"]))
153 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
155 elif (inputschema["type"] == "array" and
156 isinstance(primary, Sequence)):
158 # array type, find secondary files of elements
161 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
163 elif (inputschema["type"] == "File" and
165 isinstance(primary, Mapping) and
166 primary.get("class") == "File" and
167 "secondaryFiles" not in primary):
169 # Found a file, check for secondaryFiles
171 primary["secondaryFiles"] = []
172 for i, sf in enumerate(aslist(secondaryspec)):
173 pattern = builder.do_eval(sf["pattern"], context=primary)
176 sfpath = substitute(primary["location"], pattern)
177 required = builder.do_eval(sf.get("required"), context=primary)
179 if fsaccess.exists(sfpath):
180 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
182 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
183 "Required secondary file '%s' does not exist" % sfpath)
185 primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
186 if discovered is not None:
187 discovered[primary["location"]] = primary["secondaryFiles"]
188 elif inputschema["type"] not in primitive_types_set:
189 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
191 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
192 for inputschema in inputs:
193 primary = job_order.get(shortname(inputschema["id"]))
194 if isinstance(primary, (Mapping, Sequence)):
195 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
197 def upload_dependencies(arvrunner, name, document_loader,
198 workflowobj, uri, loadref_run,
199 include_primary=True, discovered_secondaryfiles=None):
200 """Upload the dependencies of the workflowobj document to Keep.
202 Returns a pathmapper object mapping local paths to keep references. Also
203 does an in-place update of references in "workflowobj".
205 Use scandeps to find $import, $include, $schemas, run, File and Directory
206 fields that represent external references.
208 If workflowobj has an "id" field, this will reload the document to ensure
209 it is scanning the raw document prior to preprocessing.
214 joined = document_loader.fetcher.urljoin(b, u)
215 defrg, _ = urllib.parse.urldefrag(joined)
216 if defrg not in loaded:
218 # Use fetch_text to get raw file (before preprocessing).
219 text = document_loader.fetch_text(defrg)
220 if isinstance(text, bytes):
221 textIO = StringIO(text.decode('utf-8'))
223 textIO = StringIO(text)
224 return yaml.safe_load(textIO)
229 loadref_fields = set(("$import", "run"))
231 loadref_fields = set(("$import",))
233 scanobj = workflowobj
234 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
235 # Need raw file content (before preprocessing) to ensure
236 # that external references in $include and $mixin are captured.
237 scanobj = loadref("", workflowobj["id"])
239 sc_result = scandeps(uri, scanobj,
241 set(("$include", "$schemas", "location")),
242 loadref, urljoin=document_loader.fetcher.urljoin)
247 def collect_uuids(obj):
248 loc = obj.get("location", "")
251 # Collect collection uuids that need to be resolved to
252 # portable data hashes
253 gp = collection_uuid_pattern.match(loc)
255 uuids[gp.groups()[0]] = obj
256 if collectionUUID in obj:
257 uuids[obj[collectionUUID]] = obj
259 def collect_uploads(obj):
260 loc = obj.get("location", "")
264 if sp[0] in ("file", "http", "https"):
265 # Record local files than need to be uploaded,
266 # don't include file literals, keep references, etc.
270 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
271 visit_class(sc_result, ("File", "Directory"), collect_uploads)
273 # Resolve any collection uuids we found to portable data hashes
274 # and assign them to uuid_map
276 fetch_uuids = list(uuids.keys())
278 # For a large number of fetch_uuids, API server may limit
279 # response size, so keep fetching from API server has nothing
281 lookups = arvrunner.api.collections().list(
282 filters=[["uuid", "in", fetch_uuids]],
284 select=["uuid", "portable_data_hash"]).execute(
285 num_retries=arvrunner.num_retries)
287 if not lookups["items"]:
290 for l in lookups["items"]:
291 uuid_map[l["uuid"]] = l["portable_data_hash"]
293 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
295 normalizeFilesDirs(sc)
297 if include_primary and "id" in workflowobj:
298 sc.append({"class": "File", "location": workflowobj["id"]})
300 if "$schemas" in workflowobj:
301 for s in workflowobj["$schemas"]:
302 sc.append({"class": "File", "location": s})
304 def visit_default(obj):
306 def ensure_default_location(f):
307 if "location" not in f and "path" in f:
308 f["location"] = f["path"]
310 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
311 # Doesn't exist, remove from list of dependencies to upload
312 sc[:] = [x for x in sc if x["location"] != f["location"]]
313 # Delete "default" from workflowobj
315 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
319 find_defaults(workflowobj, visit_default)
322 def discover_default_secondary_files(obj):
323 builder_job_order = {}
324 for t in obj["inputs"]:
325 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
326 # Need to create a builder object to evaluate expressions.
327 builder = make_builder(builder_job_order,
328 obj.get("hints", []),
329 obj.get("requirements", []),
331 discover_secondary_files(arvrunner.fs_access,
337 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
339 for d in list(discovered):
340 # Only interested in discovered secondaryFiles which are local
341 # files that need to be uploaded.
342 if d.startswith("file:"):
343 sc.extend(discovered[d])
347 mapper = ArvPathMapper(arvrunner, sc, "",
351 single_collection=True)
354 loc = p.get("location")
355 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
356 p["location"] = mapper.mapper(p["location"]).resolved
362 if collectionUUID in p:
363 uuid = p[collectionUUID]
364 if uuid not in uuid_map:
365 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
366 "Collection uuid %s not found" % uuid)
367 gp = collection_pdh_pattern.match(loc)
368 if gp and uuid_map[uuid] != gp.groups()[0]:
369 # This file entry has both collectionUUID and a PDH
370 # location. If the PDH doesn't match the one returned
371 # the API server, raise an error.
372 raise SourceLine(p, "location", validate.ValidationException).makeError(
373 "Expected collection uuid %s to be %s but API server reported %s" % (
374 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
376 gp = collection_uuid_pattern.match(loc)
379 uuid = gp.groups()[0]
380 if uuid not in uuid_map:
381 raise SourceLine(p, "location", validate.ValidationException).makeError(
382 "Collection uuid %s not found" % uuid)
383 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
384 p[collectionUUID] = uuid
386 visit_class(workflowobj, ("File", "Directory"), setloc)
387 visit_class(discovered, ("File", "Directory"), setloc)
389 if discovered_secondaryfiles is not None:
391 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
393 if "$schemas" in workflowobj:
395 for s in workflowobj["$schemas"]:
396 sch.append(mapper.mapper(s).resolved)
397 workflowobj["$schemas"] = sch
402 def upload_docker(arvrunner, tool):
403 """Uploads Docker images used in CommandLineTool objects."""
405 if isinstance(tool, CommandLineTool):
406 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
408 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
409 # TODO: can be supported by containers API, but not jobs API.
410 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
411 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
412 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
414 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
415 elif isinstance(tool, cwltool.workflow.Workflow):
417 upload_docker(arvrunner, s.embedded_tool)
420 def packed_workflow(arvrunner, tool, merged_map):
421 """Create a packed workflow.
423 A "packed" workflow is one where all the components have been combined into a single document."""
426 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
427 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
429 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
431 def visit(v, cur_id):
432 if isinstance(v, dict):
433 if v.get("class") in ("CommandLineTool", "Workflow"):
435 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
436 cur_id = rewrite_to_orig.get(v["id"], v["id"])
437 if "location" in v and not v["location"].startswith("keep:"):
438 v["location"] = merged_map[cur_id].resolved[v["location"]]
439 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
440 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
441 if v.get("class") == "DockerRequirement":
442 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
445 if isinstance(v, list):
452 def tag_git_version(packed):
453 if tool.tool["id"].startswith("file://"):
454 path = os.path.dirname(tool.tool["id"][7:])
456 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
457 except (OSError, subprocess.CalledProcessError):
460 packed["http://schema.org/version"] = githash
463 def upload_job_order(arvrunner, name, tool, job_order):
464 """Upload local files referenced in the input object and return updated input
465 object with 'location' updated to the proper keep references.
468 # Make a copy of the job order and set defaults.
469 builder_job_order = copy.copy(job_order)
471 # fill_in_defaults throws an error if there are any
472 # missing required parameters, we don't want it to do that
473 # so make them all optional.
474 inputs_copy = copy.deepcopy(tool.tool["inputs"])
475 for i in inputs_copy:
476 if "null" not in i["type"]:
477 i["type"] = ["null"] + aslist(i["type"])
479 fill_in_defaults(inputs_copy,
482 # Need to create a builder object to evaluate expressions.
483 builder = make_builder(builder_job_order,
487 # Now update job_order with secondaryFiles
488 discover_secondary_files(arvrunner.fs_access,
493 jobmapper = upload_dependencies(arvrunner,
497 job_order.get("id", "#"),
500 if "id" in job_order:
503 # Need to filter this out, gets added by cwltool when providing
504 # parameters on the command line.
505 if "job_order" in job_order:
506 del job_order["job_order"]
510 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
512 def upload_workflow_deps(arvrunner, tool):
513 # Ensure that Docker images needed by this workflow are available
515 upload_docker(arvrunner, tool)
517 document_loader = tool.doc_loader
521 def upload_tool_deps(deptool):
523 discovered_secondaryfiles = {}
524 pm = upload_dependencies(arvrunner,
525 "%s dependencies" % (shortname(deptool["id"])),
530 include_primary=False,
531 discovered_secondaryfiles=discovered_secondaryfiles)
532 document_loader.idx[deptool["id"]] = deptool
534 for k,v in pm.items():
535 toolmap[k] = v.resolved
536 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
538 tool.visit(upload_tool_deps)
542 def arvados_jobs_image(arvrunner, img):
543 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
546 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
547 except Exception as e:
548 raise Exception("Docker image %s is not available\n%s" % (img, e) )
551 def upload_workflow_collection(arvrunner, name, packed):
552 collection = arvados.collection.Collection(api_client=arvrunner.api,
553 keep_client=arvrunner.keep_client,
554 num_retries=arvrunner.num_retries)
555 with collection.open("workflow.cwl", "w") as f:
556 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
558 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
559 ["name", "like", name+"%"]]
560 if arvrunner.project_uuid:
561 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
562 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
565 logger.info("Using collection %s", exists["items"][0]["uuid"])
567 collection.save_new(name=name,
568 owner_uuid=arvrunner.project_uuid,
569 ensure_unique_name=True,
570 num_retries=arvrunner.num_retries)
571 logger.info("Uploaded to %s", collection.manifest_locator())
573 return collection.portable_data_hash()
576 class Runner(Process):
577 """Base class for runner processes, which submit an instance of
578 arvados-cwl-runner and wait for the final result."""
580 def __init__(self, runner, tool, loadingContext, enable_reuse,
581 output_name, output_tags, submit_runner_ram=0,
582 name=None, on_error=None, submit_runner_image=None,
583 intermediate_output_ttl=0, merged_map=None,
584 priority=None, secret_store=None,
585 collection_cache_size=256,
586 collection_cache_is_default=True):
588 loadingContext = loadingContext.copy()
589 loadingContext.metadata = loadingContext.metadata.copy()
590 loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
592 super(Runner, self).__init__(tool.tool, loadingContext)
594 self.arvrunner = runner
595 self.embedded_tool = tool
596 self.job_order = None
599 # If reuse is permitted by command line arguments but
600 # disabled by the workflow itself, disable it.
601 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
603 enable_reuse = reuse_req["enableReuse"]
604 self.enable_reuse = enable_reuse
606 self.final_output = None
607 self.output_name = output_name
608 self.output_tags = output_tags
610 self.on_error = on_error
611 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
612 self.intermediate_output_ttl = intermediate_output_ttl
613 self.priority = priority
614 self.secret_store = secret_store
615 self.enable_dev = loadingContext.enable_dev
617 self.submit_runner_cores = 1
618 self.submit_runner_ram = 1024 # defaut 1 GiB
619 self.collection_cache_size = collection_cache_size
621 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
622 if runner_resource_req:
623 if runner_resource_req.get("coresMin"):
624 self.submit_runner_cores = runner_resource_req["coresMin"]
625 if runner_resource_req.get("ramMin"):
626 self.submit_runner_ram = runner_resource_req["ramMin"]
627 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
628 self.collection_cache_size = runner_resource_req["keep_cache"]
630 if submit_runner_ram:
631 # Command line / initializer overrides default and/or spec from workflow
632 self.submit_runner_ram = submit_runner_ram
634 if self.submit_runner_ram <= 0:
635 raise Exception("Value of submit-runner-ram must be greater than zero")
637 if self.submit_runner_cores <= 0:
638 raise Exception("Value of submit-runner-cores must be greater than zero")
640 self.merged_map = merged_map or {}
643 job_order, # type: Mapping[Text, Text]
644 output_callbacks, # type: Callable[[Any, Any], Any]
645 runtimeContext # type: RuntimeContext
646 ): # type: (...) -> Generator[Any, None, None]
647 self.job_order = job_order
648 self._init_job(job_order, runtimeContext)
651 def update_pipeline_component(self, record):
654 def done(self, record):
655 """Base method for handling a completed runner."""
658 if record["state"] == "Complete":
659 if record.get("exit_code") is not None:
660 if record["exit_code"] == 33:
661 processStatus = "UnsupportedRequirement"
662 elif record["exit_code"] == 0:
663 processStatus = "success"
665 processStatus = "permanentFail"
667 processStatus = "success"
669 processStatus = "permanentFail"
673 if processStatus == "permanentFail":
674 logc = arvados.collection.CollectionReader(record["log"],
675 api_client=self.arvrunner.api,
676 keep_client=self.arvrunner.keep_client,
677 num_retries=self.arvrunner.num_retries)
678 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
680 self.final_output = record["output"]
681 outc = arvados.collection.CollectionReader(self.final_output,
682 api_client=self.arvrunner.api,
683 keep_client=self.arvrunner.keep_client,
684 num_retries=self.arvrunner.num_retries)
685 if "cwl.output.json" in outc:
686 with outc.open("cwl.output.json", "rb") as f:
688 outputs = json.loads(f.read().decode())
689 def keepify(fileobj):
690 path = fileobj["location"]
691 if not path.startswith("keep:"):
692 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
693 adjustFileObjs(outputs, keepify)
694 adjustDirObjs(outputs, keepify)
696 logger.exception("[%s] While getting final output object", self.name)
697 self.arvrunner.output_callback({}, "permanentFail")
699 self.arvrunner.output_callback(outputs, processStatus)