1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
53 logger = logging.getLogger('arvados.cwl-runner')
55 def trim_anonymous_location(obj):
56 """Remove 'location' field from File and Directory literals.
58 To make internal handling easier, literals are assigned a random id for
59 'location'. However, when writing the record back out, this can break
60 reproducibility. Since it is valid for literals not have a 'location'
65 if obj.get("location", "").startswith("_:"):
69 def remove_redundant_fields(obj):
70 for field in ("path", "nameext", "nameroot", "dirname"):
75 def find_defaults(d, op):
76 if isinstance(d, list):
79 elif isinstance(d, dict):
83 for i in viewvalues(d):
86 def make_builder(joborder, hints, requirements, runtimeContext):
89 files=[], # type: List[Dict[Text, Text]]
90 bindings=[], # type: List[Dict[Text, Any]]
91 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
92 names=None, # type: Names
93 requirements=requirements, # type: List[Dict[Text, Any]]
94 hints=hints, # type: List[Dict[Text, Any]]
95 resources={}, # type: Dict[str, int]
96 mutation_manager=None, # type: Optional[MutationManager]
97 formatgraph=None, # type: Optional[Graph]
98 make_fs_access=None, # type: Type[StdFsAccess]
99 fs_access=None, # type: StdFsAccess
100 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101 timeout=runtimeContext.eval_timeout, # type: float
102 debug=runtimeContext.debug, # type: bool
103 js_console=runtimeContext.js_console, # type: bool
104 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
105 loadListing="", # type: Text
106 outdir="", # type: Text
107 tmpdir="", # type: Text
108 stagedir="", # type: Text
111 def search_schemadef(name, reqs):
113 if r["class"] == "SchemaDefRequirement":
114 for sd in r["types"]:
115 if sd["name"] == name:
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120 "float", "double", "string", "record",
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125 # union type, collect all possible secondaryFiles
126 for i in inputschema:
127 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130 if isinstance(inputschema, basestring):
131 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
137 if "secondaryFiles" in inputschema:
138 # set secondaryFiles, may be inherited by compound types.
139 secondaryspec = inputschema["secondaryFiles"]
141 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142 not isinstance(inputschema["type"], basestring)):
143 # compound type (union, array, record)
144 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146 elif (inputschema["type"] == "record" and
147 isinstance(primary, Mapping)):
149 # record type, find secondary files associated with fields.
151 for f in inputschema["fields"]:
152 p = primary.get(shortname(f["name"]))
154 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156 elif (inputschema["type"] == "array" and
157 isinstance(primary, Sequence)):
159 # array type, find secondary files of elements
162 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164 elif (inputschema["type"] == "File" and
166 isinstance(primary, Mapping) and
167 primary.get("class") == "File" and
168 "secondaryFiles" not in primary):
170 # Found a file, check for secondaryFiles
172 primary["secondaryFiles"] = []
173 for i, sf in enumerate(aslist(secondaryspec)):
174 pattern = builder.do_eval(sf["pattern"], context=primary)
177 sfpath = substitute(primary["location"], pattern)
178 required = builder.do_eval(sf.get("required"), context=primary)
180 if fsaccess.exists(sfpath):
181 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
183 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
184 "Required secondary file '%s' does not exist" % sfpath)
186 primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
187 if discovered is not None:
188 discovered[primary["location"]] = primary["secondaryFiles"]
189 elif inputschema["type"] not in primitive_types_set:
190 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
192 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
193 for inputschema in inputs:
194 primary = job_order.get(shortname(inputschema["id"]))
195 if isinstance(primary, (Mapping, Sequence)):
196 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
198 def upload_dependencies(arvrunner, name, document_loader,
199 workflowobj, uri, loadref_run,
200 include_primary=True, discovered_secondaryfiles=None):
201 """Upload the dependencies of the workflowobj document to Keep.
203 Returns a pathmapper object mapping local paths to keep references. Also
204 does an in-place update of references in "workflowobj".
206 Use scandeps to find $import, $include, $schemas, run, File and Directory
207 fields that represent external references.
209 If workflowobj has an "id" field, this will reload the document to ensure
210 it is scanning the raw document prior to preprocessing.
215 joined = document_loader.fetcher.urljoin(b, u)
216 defrg, _ = urllib.parse.urldefrag(joined)
217 if defrg not in loaded:
219 # Use fetch_text to get raw file (before preprocessing).
220 text = document_loader.fetch_text(defrg)
221 if isinstance(text, bytes):
222 textIO = StringIO(text.decode('utf-8'))
224 textIO = StringIO(text)
225 return yaml.safe_load(textIO)
230 loadref_fields = set(("$import", "run"))
232 loadref_fields = set(("$import",))
234 scanobj = workflowobj
235 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
236 # Need raw file content (before preprocessing) to ensure
237 # that external references in $include and $mixin are captured.
238 scanobj = loadref("", workflowobj["id"])
240 sc_result = scandeps(uri, scanobj,
242 set(("$include", "$schemas", "location")),
243 loadref, urljoin=document_loader.fetcher.urljoin)
248 def collect_uuids(obj):
249 loc = obj.get("location", "")
252 # Collect collection uuids that need to be resolved to
253 # portable data hashes
254 gp = collection_uuid_pattern.match(loc)
256 uuids[gp.groups()[0]] = obj
257 if collectionUUID in obj:
258 uuids[obj[collectionUUID]] = obj
260 def collect_uploads(obj):
261 loc = obj.get("location", "")
265 if sp[0] in ("file", "http", "https"):
266 # Record local files than need to be uploaded,
267 # don't include file literals, keep references, etc.
271 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
272 visit_class(sc_result, ("File", "Directory"), collect_uploads)
274 # Resolve any collection uuids we found to portable data hashes
275 # and assign them to uuid_map
277 fetch_uuids = list(uuids.keys())
279 # For a large number of fetch_uuids, API server may limit
280 # response size, so keep fetching from API server has nothing
282 lookups = arvrunner.api.collections().list(
283 filters=[["uuid", "in", fetch_uuids]],
285 select=["uuid", "portable_data_hash"]).execute(
286 num_retries=arvrunner.num_retries)
288 if not lookups["items"]:
291 for l in lookups["items"]:
292 uuid_map[l["uuid"]] = l["portable_data_hash"]
294 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
296 normalizeFilesDirs(sc)
298 if include_primary and "id" in workflowobj:
299 sc.append({"class": "File", "location": workflowobj["id"]})
301 if "$schemas" in workflowobj:
302 for s in workflowobj["$schemas"]:
303 sc.append({"class": "File", "location": s})
305 def visit_default(obj):
307 def ensure_default_location(f):
308 if "location" not in f and "path" in f:
309 f["location"] = f["path"]
311 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
312 # Doesn't exist, remove from list of dependencies to upload
313 sc[:] = [x for x in sc if x["location"] != f["location"]]
314 # Delete "default" from workflowobj
316 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
320 find_defaults(workflowobj, visit_default)
323 def discover_default_secondary_files(obj):
324 builder_job_order = {}
325 for t in obj["inputs"]:
326 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
327 # Need to create a builder object to evaluate expressions.
328 builder = make_builder(builder_job_order,
329 obj.get("hints", []),
330 obj.get("requirements", []),
332 discover_secondary_files(arvrunner.fs_access,
338 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
339 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
341 for d in list(discovered):
342 # Only interested in discovered secondaryFiles which are local
343 # files that need to be uploaded.
344 if d.startswith("file:"):
345 sc.extend(discovered[d])
349 mapper = ArvPathMapper(arvrunner, sc, "",
353 single_collection=True)
356 loc = p.get("location")
357 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
358 p["location"] = mapper.mapper(p["location"]).resolved
364 if collectionUUID in p:
365 uuid = p[collectionUUID]
366 if uuid not in uuid_map:
367 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
368 "Collection uuid %s not found" % uuid)
369 gp = collection_pdh_pattern.match(loc)
370 if gp and uuid_map[uuid] != gp.groups()[0]:
371 # This file entry has both collectionUUID and a PDH
372 # location. If the PDH doesn't match the one returned
373 # the API server, raise an error.
374 raise SourceLine(p, "location", validate.ValidationException).makeError(
375 "Expected collection uuid %s to be %s but API server reported %s" % (
376 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
378 gp = collection_uuid_pattern.match(loc)
381 uuid = gp.groups()[0]
382 if uuid not in uuid_map:
383 raise SourceLine(p, "location", validate.ValidationException).makeError(
384 "Collection uuid %s not found" % uuid)
385 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
386 p[collectionUUID] = uuid
388 visit_class(workflowobj, ("File", "Directory"), setloc)
389 visit_class(discovered, ("File", "Directory"), setloc)
391 if discovered_secondaryfiles is not None:
393 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
395 if "$schemas" in workflowobj:
397 for s in workflowobj["$schemas"]:
398 sch.append(mapper.mapper(s).resolved)
399 workflowobj["$schemas"] = sch
404 def upload_docker(arvrunner, tool):
405 """Uploads Docker images used in CommandLineTool objects."""
407 if isinstance(tool, CommandLineTool):
408 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
410 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
411 # TODO: can be supported by containers API, but not jobs API.
412 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
413 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
414 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
416 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
417 elif isinstance(tool, cwltool.workflow.Workflow):
419 upload_docker(arvrunner, s.embedded_tool)
422 def packed_workflow(arvrunner, tool, merged_map):
423 """Create a packed workflow.
425 A "packed" workflow is one where all the components have been combined into a single document."""
428 packed = pack(arvrunner.loadingContext, tool.tool["id"],
429 rewrite_out=rewrites,
430 loader=tool.doc_loader)
432 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
434 def visit(v, cur_id):
435 if isinstance(v, dict):
436 if v.get("class") in ("CommandLineTool", "Workflow"):
438 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
439 cur_id = rewrite_to_orig.get(v["id"], v["id"])
440 if "location" in v and not v["location"].startswith("keep:"):
441 v["location"] = merged_map[cur_id].resolved[v["location"]]
442 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
443 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
444 if v.get("class") == "DockerRequirement":
445 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
448 if isinstance(v, list):
455 def tag_git_version(packed):
456 if tool.tool["id"].startswith("file://"):
457 path = os.path.dirname(tool.tool["id"][7:])
459 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
460 except (OSError, subprocess.CalledProcessError):
463 packed["http://schema.org/version"] = githash
466 def upload_job_order(arvrunner, name, tool, job_order):
467 """Upload local files referenced in the input object and return updated input
468 object with 'location' updated to the proper keep references.
471 # Make a copy of the job order and set defaults.
472 builder_job_order = copy.copy(job_order)
474 # fill_in_defaults throws an error if there are any
475 # missing required parameters, we don't want it to do that
476 # so make them all optional.
477 inputs_copy = copy.deepcopy(tool.tool["inputs"])
478 for i in inputs_copy:
479 if "null" not in i["type"]:
480 i["type"] = ["null"] + aslist(i["type"])
482 fill_in_defaults(inputs_copy,
485 # Need to create a builder object to evaluate expressions.
486 builder = make_builder(builder_job_order,
490 # Now update job_order with secondaryFiles
491 discover_secondary_files(arvrunner.fs_access,
496 jobmapper = upload_dependencies(arvrunner,
500 job_order.get("id", "#"),
503 if "id" in job_order:
506 # Need to filter this out, gets added by cwltool when providing
507 # parameters on the command line.
508 if "job_order" in job_order:
509 del job_order["job_order"]
513 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
515 def upload_workflow_deps(arvrunner, tool):
516 # Ensure that Docker images needed by this workflow are available
518 upload_docker(arvrunner, tool)
520 document_loader = tool.doc_loader
524 def upload_tool_deps(deptool):
526 discovered_secondaryfiles = {}
527 pm = upload_dependencies(arvrunner,
528 "%s dependencies" % (shortname(deptool["id"])),
533 include_primary=False,
534 discovered_secondaryfiles=discovered_secondaryfiles)
535 document_loader.idx[deptool["id"]] = deptool
537 for k,v in pm.items():
538 toolmap[k] = v.resolved
539 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
541 tool.visit(upload_tool_deps)
545 def arvados_jobs_image(arvrunner, img):
546 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
549 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
550 except Exception as e:
551 raise Exception("Docker image %s is not available\n%s" % (img, e) )
554 def upload_workflow_collection(arvrunner, name, packed):
555 collection = arvados.collection.Collection(api_client=arvrunner.api,
556 keep_client=arvrunner.keep_client,
557 num_retries=arvrunner.num_retries)
558 with collection.open("workflow.cwl", "w") as f:
559 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
561 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
562 ["name", "like", name+"%"]]
563 if arvrunner.project_uuid:
564 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
565 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
568 logger.info("Using collection %s", exists["items"][0]["uuid"])
570 collection.save_new(name=name,
571 owner_uuid=arvrunner.project_uuid,
572 ensure_unique_name=True,
573 num_retries=arvrunner.num_retries)
574 logger.info("Uploaded to %s", collection.manifest_locator())
576 return collection.portable_data_hash()
579 class Runner(Process):
580 """Base class for runner processes, which submit an instance of
581 arvados-cwl-runner and wait for the final result."""
583 def __init__(self, runner, updated_tool,
584 tool, loadingContext, enable_reuse,
585 output_name, output_tags, submit_runner_ram=0,
586 name=None, on_error=None, submit_runner_image=None,
587 intermediate_output_ttl=0, merged_map=None,
588 priority=None, secret_store=None,
589 collection_cache_size=256,
590 collection_cache_is_default=True):
592 loadingContext = loadingContext.copy()
593 loadingContext.metadata = updated_tool.metadata.copy()
595 super(Runner, self).__init__(updated_tool.tool, loadingContext)
597 self.arvrunner = runner
598 self.embedded_tool = tool
599 self.job_order = None
602 # If reuse is permitted by command line arguments but
603 # disabled by the workflow itself, disable it.
604 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
606 enable_reuse = reuse_req["enableReuse"]
607 self.enable_reuse = enable_reuse
609 self.final_output = None
610 self.output_name = output_name
611 self.output_tags = output_tags
613 self.on_error = on_error
614 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
615 self.intermediate_output_ttl = intermediate_output_ttl
616 self.priority = priority
617 self.secret_store = secret_store
618 self.enable_dev = loadingContext.enable_dev
620 self.submit_runner_cores = 1
621 self.submit_runner_ram = 1024 # defaut 1 GiB
622 self.collection_cache_size = collection_cache_size
624 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
625 if runner_resource_req:
626 if runner_resource_req.get("coresMin"):
627 self.submit_runner_cores = runner_resource_req["coresMin"]
628 if runner_resource_req.get("ramMin"):
629 self.submit_runner_ram = runner_resource_req["ramMin"]
630 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
631 self.collection_cache_size = runner_resource_req["keep_cache"]
633 if submit_runner_ram:
634 # Command line / initializer overrides default and/or spec from workflow
635 self.submit_runner_ram = submit_runner_ram
637 if self.submit_runner_ram <= 0:
638 raise Exception("Value of submit-runner-ram must be greater than zero")
640 if self.submit_runner_cores <= 0:
641 raise Exception("Value of submit-runner-cores must be greater than zero")
643 self.merged_map = merged_map or {}
646 job_order, # type: Mapping[Text, Text]
647 output_callbacks, # type: Callable[[Any, Any], Any]
648 runtimeContext # type: RuntimeContext
649 ): # type: (...) -> Generator[Any, None, None]
650 self.job_order = job_order
651 self._init_job(job_order, runtimeContext)
654 def update_pipeline_component(self, record):
657 def done(self, record):
658 """Base method for handling a completed runner."""
661 if record["state"] == "Complete":
662 if record.get("exit_code") is not None:
663 if record["exit_code"] == 33:
664 processStatus = "UnsupportedRequirement"
665 elif record["exit_code"] == 0:
666 processStatus = "success"
668 processStatus = "permanentFail"
670 processStatus = "success"
672 processStatus = "permanentFail"
676 if processStatus == "permanentFail":
677 logc = arvados.collection.CollectionReader(record["log"],
678 api_client=self.arvrunner.api,
679 keep_client=self.arvrunner.keep_client,
680 num_retries=self.arvrunner.num_retries)
681 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
683 self.final_output = record["output"]
684 outc = arvados.collection.CollectionReader(self.final_output,
685 api_client=self.arvrunner.api,
686 keep_client=self.arvrunner.keep_client,
687 num_retries=self.arvrunner.num_retries)
688 if "cwl.output.json" in outc:
689 with outc.open("cwl.output.json", "rb") as f:
691 outputs = json.loads(f.read().decode())
692 def keepify(fileobj):
693 path = fileobj["location"]
694 if not path.startswith("keep:"):
695 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
696 adjustFileObjs(outputs, keepify)
697 adjustDirObjs(outputs, keepify)
699 logger.exception("[%s] While getting final output object", self.name)
700 self.arvrunner.output_callback({}, "permanentFail")
702 self.arvrunner.output_callback(outputs, processStatus)