1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
13 from functools import partial
17 from collections import namedtuple
18 from io import StringIO
19 from typing import Mapping, Sequence
21 if os.name == "posix" and sys.version_info[0] < 3:
22 import subprocess32 as subprocess
26 from schema_salad.sourceline import SourceLine, cmap
28 from cwltool.command_line_tool import CommandLineTool
29 import cwltool.workflow
30 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
31 shortname, Process, fill_in_defaults)
32 from cwltool.load_tool import fetch_document
33 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
34 from cwltool.utils import aslist
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
45 import arvados_cwl.arvdocker
46 from .pathmapper import ArvPathMapper, trim_listing
47 from ._version import __version__
49 from . context import ArvRuntimeContext
51 logger = logging.getLogger('arvados.cwl-runner')
53 def trim_anonymous_location(obj):
54 """Remove 'location' field from File and Directory literals.
56 To make internal handling easier, literals are assigned a random id for
57 'location'. However, when writing the record back out, this can break
58 reproducibility. Since it is valid for literals not have a 'location'
63 if obj.get("location", "").startswith("_:"):
67 def remove_redundant_fields(obj):
68 for field in ("path", "nameext", "nameroot", "dirname"):
73 def find_defaults(d, op):
74 if isinstance(d, list):
77 elif isinstance(d, dict):
81 for i in viewvalues(d):
84 def make_builder(joborder, hints, requirements, runtimeContext):
87 files=[], # type: List[Dict[Text, Text]]
88 bindings=[], # type: List[Dict[Text, Any]]
89 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
90 names=None, # type: Names
91 requirements=requirements, # type: List[Dict[Text, Any]]
92 hints=hints, # type: List[Dict[Text, Any]]
93 resources={}, # type: Dict[str, int]
94 mutation_manager=None, # type: Optional[MutationManager]
95 formatgraph=None, # type: Optional[Graph]
96 make_fs_access=None, # type: Type[StdFsAccess]
97 fs_access=None, # type: StdFsAccess
98 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
99 timeout=runtimeContext.eval_timeout, # type: float
100 debug=runtimeContext.debug, # type: bool
101 js_console=runtimeContext.js_console, # type: bool
102 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
103 loadListing="", # type: Text
104 outdir="", # type: Text
105 tmpdir="", # type: Text
106 stagedir="", # type: Text
110 def set_secondary(fsaccess, builder, inputschema, primary, discovered):
111 if isinstance(primary, Mapping) and primary.get("class") == "File":
112 if "secondaryFiles" not in primary:
113 primary["secondaryFiles"] = []
114 for i, sf in enumerate(inputschema["secondaryFiles"]):
115 pattern = builder.do_eval(sf["pattern"], context=primary)
118 sfpath = substitute(primary["location"], pattern)
119 required = builder.do_eval(sf["required"], context=primary)
121 if fsaccess.exists(sfpath):
122 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
124 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
125 "Required secondary file '%s' does not exist" % sfpath)
127 primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
128 if discovered is not None:
129 discovered[primary["location"]] = primary["secondaryFiles"]
130 elif isinstance(primary, Sequence):
132 set_secondary(fsaccess, builder, inputschema, e, discovered)
134 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
135 for inputschema in inputs:
136 primary = job_order.get(shortname(inputschema["id"]))
137 if isinstance(primary, (Mapping, Sequence)) and inputschema.get("secondaryFiles"):
138 set_secondary(fsaccess, builder, inputschema, primary, discovered)
140 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
141 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
143 def upload_dependencies(arvrunner, name, document_loader,
144 workflowobj, uri, loadref_run,
145 include_primary=True, discovered_secondaryfiles=None):
146 """Upload the dependencies of the workflowobj document to Keep.
148 Returns a pathmapper object mapping local paths to keep references. Also
149 does an in-place update of references in "workflowobj".
151 Use scandeps to find $import, $include, $schemas, run, File and Directory
152 fields that represent external references.
154 If workflowobj has an "id" field, this will reload the document to ensure
155 it is scanning the raw document prior to preprocessing.
160 joined = document_loader.fetcher.urljoin(b, u)
161 defrg, _ = urllib.parse.urldefrag(joined)
162 if defrg not in loaded:
164 # Use fetch_text to get raw file (before preprocessing).
165 text = document_loader.fetch_text(defrg)
166 if isinstance(text, bytes):
167 textIO = StringIO(text.decode('utf-8'))
169 textIO = StringIO(text)
170 return yaml.safe_load(textIO)
175 loadref_fields = set(("$import", "run"))
177 loadref_fields = set(("$import",))
179 scanobj = workflowobj
180 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
181 # Need raw file content (before preprocessing) to ensure
182 # that external references in $include and $mixin are captured.
183 scanobj = loadref("", workflowobj["id"])
185 sc_result = scandeps(uri, scanobj,
187 set(("$include", "$schemas", "location")),
188 loadref, urljoin=document_loader.fetcher.urljoin)
193 def collect_uuids(obj):
194 loc = obj.get("location", "")
197 # Collect collection uuids that need to be resolved to
198 # portable data hashes
199 gp = collection_uuid_pattern.match(loc)
201 uuids[gp.groups()[0]] = obj
202 if collectionUUID in obj:
203 uuids[obj[collectionUUID]] = obj
205 def collect_uploads(obj):
206 loc = obj.get("location", "")
210 if sp[0] in ("file", "http", "https"):
211 # Record local files than need to be uploaded,
212 # don't include file literals, keep references, etc.
216 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
217 visit_class(sc_result, ("File", "Directory"), collect_uploads)
219 # Resolve any collection uuids we found to portable data hashes
220 # and assign them to uuid_map
222 fetch_uuids = list(uuids.keys())
224 # For a large number of fetch_uuids, API server may limit
225 # response size, so keep fetching from API server has nothing
227 lookups = arvrunner.api.collections().list(
228 filters=[["uuid", "in", fetch_uuids]],
230 select=["uuid", "portable_data_hash"]).execute(
231 num_retries=arvrunner.num_retries)
233 if not lookups["items"]:
236 for l in lookups["items"]:
237 uuid_map[l["uuid"]] = l["portable_data_hash"]
239 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
241 normalizeFilesDirs(sc)
243 if include_primary and "id" in workflowobj:
244 sc.append({"class": "File", "location": workflowobj["id"]})
246 if "$schemas" in workflowobj:
247 for s in workflowobj["$schemas"]:
248 sc.append({"class": "File", "location": s})
250 def visit_default(obj):
252 def ensure_default_location(f):
253 if "location" not in f and "path" in f:
254 f["location"] = f["path"]
256 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
257 # Doesn't exist, remove from list of dependencies to upload
258 sc[:] = [x for x in sc if x["location"] != f["location"]]
259 # Delete "default" from workflowobj
261 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
265 find_defaults(workflowobj, visit_default)
268 def discover_default_secondary_files(obj):
269 builder_job_order = {}
270 for t in obj["inputs"]:
271 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
272 # Need to create a builder object to evaluate expressions.
273 builder = make_builder(builder_job_order,
274 obj.get("hints", []),
275 obj.get("requirements", []),
277 discover_secondary_files(arvrunner.fs_access,
283 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
285 for d in list(discovered):
286 # Only interested in discovered secondaryFiles which are local
287 # files that need to be uploaded.
288 if d.startswith("file:"):
289 sc.extend(discovered[d])
293 mapper = ArvPathMapper(arvrunner, sc, "",
297 single_collection=True)
300 loc = p.get("location")
301 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
302 p["location"] = mapper.mapper(p["location"]).resolved
308 if collectionUUID in p:
309 uuid = p[collectionUUID]
310 if uuid not in uuid_map:
311 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
312 "Collection uuid %s not found" % uuid)
313 gp = collection_pdh_pattern.match(loc)
314 if gp and uuid_map[uuid] != gp.groups()[0]:
315 # This file entry has both collectionUUID and a PDH
316 # location. If the PDH doesn't match the one returned
317 # the API server, raise an error.
318 raise SourceLine(p, "location", validate.ValidationException).makeError(
319 "Expected collection uuid %s to be %s but API server reported %s" % (
320 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
322 gp = collection_uuid_pattern.match(loc)
325 uuid = gp.groups()[0]
326 if uuid not in uuid_map:
327 raise SourceLine(p, "location", validate.ValidationException).makeError(
328 "Collection uuid %s not found" % uuid)
329 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
330 p[collectionUUID] = uuid
332 visit_class(workflowobj, ("File", "Directory"), setloc)
333 visit_class(discovered, ("File", "Directory"), setloc)
335 if discovered_secondaryfiles is not None:
337 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
339 if "$schemas" in workflowobj:
341 for s in workflowobj["$schemas"]:
342 sch.append(mapper.mapper(s).resolved)
343 workflowobj["$schemas"] = sch
348 def upload_docker(arvrunner, tool):
349 """Uploads Docker images used in CommandLineTool objects."""
351 if isinstance(tool, CommandLineTool):
352 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
354 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
355 # TODO: can be supported by containers API, but not jobs API.
356 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
357 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
358 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
360 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
361 elif isinstance(tool, cwltool.workflow.Workflow):
363 upload_docker(arvrunner, s.embedded_tool)
366 def packed_workflow(arvrunner, tool, merged_map):
367 """Create a packed workflow.
369 A "packed" workflow is one where all the components have been combined into a single document."""
372 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
373 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
375 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
377 def visit(v, cur_id):
378 if isinstance(v, dict):
379 if v.get("class") in ("CommandLineTool", "Workflow"):
381 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
382 cur_id = rewrite_to_orig.get(v["id"], v["id"])
383 if "location" in v and not v["location"].startswith("keep:"):
384 v["location"] = merged_map[cur_id].resolved[v["location"]]
385 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
386 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
387 if v.get("class") == "DockerRequirement":
388 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
391 if isinstance(v, list):
398 def tag_git_version(packed):
399 if tool.tool["id"].startswith("file://"):
400 path = os.path.dirname(tool.tool["id"][7:])
402 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
403 except (OSError, subprocess.CalledProcessError):
406 packed["http://schema.org/version"] = githash
409 def upload_job_order(arvrunner, name, tool, job_order):
410 """Upload local files referenced in the input object and return updated input
411 object with 'location' updated to the proper keep references.
414 # Make a copy of the job order and set defaults.
415 builder_job_order = copy.copy(job_order)
416 fill_in_defaults(tool.tool["inputs"],
419 # Need to create a builder object to evaluate expressions.
420 builder = make_builder(builder_job_order,
421 tool.tool.get("hints", []),
422 tool.tool.get("requirements", []),
424 # Now update job_order with secondaryFiles
425 discover_secondary_files(arvrunner.fs_access,
430 jobmapper = upload_dependencies(arvrunner,
434 job_order.get("id", "#"),
437 if "id" in job_order:
440 # Need to filter this out, gets added by cwltool when providing
441 # parameters on the command line.
442 if "job_order" in job_order:
443 del job_order["job_order"]
447 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
449 def upload_workflow_deps(arvrunner, tool):
450 # Ensure that Docker images needed by this workflow are available
452 upload_docker(arvrunner, tool)
454 document_loader = tool.doc_loader
458 def upload_tool_deps(deptool):
460 discovered_secondaryfiles = {}
461 pm = upload_dependencies(arvrunner,
462 "%s dependencies" % (shortname(deptool["id"])),
467 include_primary=False,
468 discovered_secondaryfiles=discovered_secondaryfiles)
469 document_loader.idx[deptool["id"]] = deptool
471 for k,v in pm.items():
472 toolmap[k] = v.resolved
473 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
475 tool.visit(upload_tool_deps)
479 def arvados_jobs_image(arvrunner, img):
480 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
483 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
484 except Exception as e:
485 raise Exception("Docker image %s is not available\n%s" % (img, e) )
488 def upload_workflow_collection(arvrunner, name, packed):
489 collection = arvados.collection.Collection(api_client=arvrunner.api,
490 keep_client=arvrunner.keep_client,
491 num_retries=arvrunner.num_retries)
492 with collection.open("workflow.cwl", "w") as f:
493 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
495 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
496 ["name", "like", name+"%"]]
497 if arvrunner.project_uuid:
498 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
499 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
502 logger.info("Using collection %s", exists["items"][0]["uuid"])
504 collection.save_new(name=name,
505 owner_uuid=arvrunner.project_uuid,
506 ensure_unique_name=True,
507 num_retries=arvrunner.num_retries)
508 logger.info("Uploaded to %s", collection.manifest_locator())
510 return collection.portable_data_hash()
513 class Runner(Process):
514 """Base class for runner processes, which submit an instance of
515 arvados-cwl-runner and wait for the final result."""
517 def __init__(self, runner, tool, loadingContext, enable_reuse,
518 output_name, output_tags, submit_runner_ram=0,
519 name=None, on_error=None, submit_runner_image=None,
520 intermediate_output_ttl=0, merged_map=None,
521 priority=None, secret_store=None,
522 collection_cache_size=256,
523 collection_cache_is_default=True):
525 loadingContext = loadingContext.copy()
526 loadingContext.metadata = loadingContext.metadata.copy()
527 loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
529 super(Runner, self).__init__(tool.tool, loadingContext)
531 self.arvrunner = runner
532 self.embedded_tool = tool
533 self.job_order = None
536 # If reuse is permitted by command line arguments but
537 # disabled by the workflow itself, disable it.
538 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
540 enable_reuse = reuse_req["enableReuse"]
541 self.enable_reuse = enable_reuse
543 self.final_output = None
544 self.output_name = output_name
545 self.output_tags = output_tags
547 self.on_error = on_error
548 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
549 self.intermediate_output_ttl = intermediate_output_ttl
550 self.priority = priority
551 self.secret_store = secret_store
552 self.enable_dev = loadingContext.enable_dev
554 self.submit_runner_cores = 1
555 self.submit_runner_ram = 1024 # defaut 1 GiB
556 self.collection_cache_size = collection_cache_size
558 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
559 if runner_resource_req:
560 if runner_resource_req.get("coresMin"):
561 self.submit_runner_cores = runner_resource_req["coresMin"]
562 if runner_resource_req.get("ramMin"):
563 self.submit_runner_ram = runner_resource_req["ramMin"]
564 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
565 self.collection_cache_size = runner_resource_req["keep_cache"]
567 if submit_runner_ram:
568 # Command line / initializer overrides default and/or spec from workflow
569 self.submit_runner_ram = submit_runner_ram
571 if self.submit_runner_ram <= 0:
572 raise Exception("Value of submit-runner-ram must be greater than zero")
574 if self.submit_runner_cores <= 0:
575 raise Exception("Value of submit-runner-cores must be greater than zero")
577 self.merged_map = merged_map or {}
580 job_order, # type: Mapping[Text, Text]
581 output_callbacks, # type: Callable[[Any, Any], Any]
582 runtimeContext # type: RuntimeContext
583 ): # type: (...) -> Generator[Any, None, None]
584 self.job_order = job_order
585 self._init_job(job_order, runtimeContext)
588 def update_pipeline_component(self, record):
591 def done(self, record):
592 """Base method for handling a completed runner."""
595 if record["state"] == "Complete":
596 if record.get("exit_code") is not None:
597 if record["exit_code"] == 33:
598 processStatus = "UnsupportedRequirement"
599 elif record["exit_code"] == 0:
600 processStatus = "success"
602 processStatus = "permanentFail"
604 processStatus = "success"
606 processStatus = "permanentFail"
610 if processStatus == "permanentFail":
611 logc = arvados.collection.CollectionReader(record["log"],
612 api_client=self.arvrunner.api,
613 keep_client=self.arvrunner.keep_client,
614 num_retries=self.arvrunner.num_retries)
615 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
617 self.final_output = record["output"]
618 outc = arvados.collection.CollectionReader(self.final_output,
619 api_client=self.arvrunner.api,
620 keep_client=self.arvrunner.keep_client,
621 num_retries=self.arvrunner.num_retries)
622 if "cwl.output.json" in outc:
623 with outc.open("cwl.output.json", "rb") as f:
625 outputs = json.loads(f.read().decode())
626 def keepify(fileobj):
627 path = fileobj["location"]
628 if not path.startswith("keep:"):
629 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
630 adjustFileObjs(outputs, keepify)
631 adjustDirObjs(outputs, keepify)
633 logger.exception("[%s] While getting final output object", self.name)
634 self.arvrunner.output_callback({}, "permanentFail")
636 self.arvrunner.output_callback(outputs, processStatus)