1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
53 logger = logging.getLogger('arvados.cwl-runner')
55 def trim_anonymous_location(obj):
56 """Remove 'location' field from File and Directory literals.
58 To make internal handling easier, literals are assigned a random id for
59 'location'. However, when writing the record back out, this can break
60 reproducibility. Since it is valid for literals not have a 'location'
65 if obj.get("location", "").startswith("_:"):
69 def remove_redundant_fields(obj):
70 for field in ("path", "nameext", "nameroot", "dirname"):
75 def find_defaults(d, op):
76 if isinstance(d, list):
79 elif isinstance(d, dict):
83 for i in viewvalues(d):
86 def make_builder(joborder, hints, requirements, runtimeContext):
89 files=[], # type: List[Dict[Text, Text]]
90 bindings=[], # type: List[Dict[Text, Any]]
91 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
92 names=None, # type: Names
93 requirements=requirements, # type: List[Dict[Text, Any]]
94 hints=hints, # type: List[Dict[Text, Any]]
95 resources={}, # type: Dict[str, int]
96 mutation_manager=None, # type: Optional[MutationManager]
97 formatgraph=None, # type: Optional[Graph]
98 make_fs_access=None, # type: Type[StdFsAccess]
99 fs_access=None, # type: StdFsAccess
100 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101 timeout=runtimeContext.eval_timeout, # type: float
102 debug=runtimeContext.debug, # type: bool
103 js_console=runtimeContext.js_console, # type: bool
104 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
105 loadListing="", # type: Text
106 outdir="", # type: Text
107 tmpdir="", # type: Text
108 stagedir="", # type: Text
111 def search_schemadef(name, reqs):
113 if r["class"] == "SchemaDefRequirement":
114 for sd in r["types"]:
115 if sd["name"] == name:
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120 "float", "double", "string", "record",
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125 # union type, collect all possible secondaryFiles
126 for i in inputschema:
127 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130 if isinstance(inputschema, basestring):
131 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
137 if "secondaryFiles" in inputschema:
138 # set secondaryFiles, may be inherited by compound types.
139 secondaryspec = inputschema["secondaryFiles"]
141 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142 not isinstance(inputschema["type"], basestring)):
143 # compound type (union, array, record)
144 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146 elif (inputschema["type"] == "record" and
147 isinstance(primary, Mapping)):
149 # record type, find secondary files associated with fields.
151 for f in inputschema["fields"]:
152 p = primary.get(shortname(f["name"]))
154 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156 elif (inputschema["type"] == "array" and
157 isinstance(primary, Sequence)):
159 # array type, find secondary files of elements
162 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164 elif (inputschema["type"] == "File" and
166 isinstance(primary, Mapping) and
167 primary.get("class") == "File" and
168 "secondaryFiles" not in primary):
170 # Found a file, check for secondaryFiles
173 primary["secondaryFiles"] = secondaryspec
174 for i, sf in enumerate(aslist(secondaryspec)):
175 pattern = builder.do_eval(sf["pattern"], context=primary)
178 if isinstance(pattern, list):
179 specs.extend(pattern)
180 elif isinstance(pattern, dict):
181 specs.append(pattern)
182 elif isinstance(pattern, str):
183 specs.append({"pattern": pattern})
185 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
186 "Expression must return list, object, string or null")
189 for i, sf in enumerate(specs):
190 if isinstance(sf, dict):
191 if sf.get("class") == "File":
192 pattern = sf["basename"]
194 pattern = sf["pattern"]
195 required = sf.get("required")
196 elif isinstance(sf, str):
200 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
201 "Expression must return list, object, string or null")
203 sfpath = substitute(primary["location"], pattern)
204 required = builder.do_eval(required, context=primary)
206 if fsaccess.exists(sfpath):
207 found.append({"location": sfpath, "class": "File"})
209 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
210 "Required secondary file '%s' does not exist" % sfpath)
212 primary["secondaryFiles"] = cmap(found)
213 if discovered is not None:
214 discovered[primary["location"]] = primary["secondaryFiles"]
215 elif inputschema["type"] not in primitive_types_set:
216 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
218 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
219 for inputschema in inputs:
220 primary = job_order.get(shortname(inputschema["id"]))
221 if isinstance(primary, (Mapping, Sequence)):
222 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
224 def upload_dependencies(arvrunner, name, document_loader,
225 workflowobj, uri, loadref_run,
226 include_primary=True, discovered_secondaryfiles=None):
227 """Upload the dependencies of the workflowobj document to Keep.
229 Returns a pathmapper object mapping local paths to keep references. Also
230 does an in-place update of references in "workflowobj".
232 Use scandeps to find $import, $include, $schemas, run, File and Directory
233 fields that represent external references.
235 If workflowobj has an "id" field, this will reload the document to ensure
236 it is scanning the raw document prior to preprocessing.
241 joined = document_loader.fetcher.urljoin(b, u)
242 defrg, _ = urllib.parse.urldefrag(joined)
243 if defrg not in loaded:
245 # Use fetch_text to get raw file (before preprocessing).
246 text = document_loader.fetch_text(defrg)
247 if isinstance(text, bytes):
248 textIO = StringIO(text.decode('utf-8'))
250 textIO = StringIO(text)
251 return yaml.safe_load(textIO)
256 loadref_fields = set(("$import", "run"))
258 loadref_fields = set(("$import",))
260 scanobj = workflowobj
261 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
262 # Need raw file content (before preprocessing) to ensure
263 # that external references in $include and $mixin are captured.
264 scanobj = loadref("", workflowobj["id"])
266 sc_result = scandeps(uri, scanobj,
268 set(("$include", "$schemas", "location")),
269 loadref, urljoin=document_loader.fetcher.urljoin)
274 def collect_uuids(obj):
275 loc = obj.get("location", "")
278 # Collect collection uuids that need to be resolved to
279 # portable data hashes
280 gp = collection_uuid_pattern.match(loc)
282 uuids[gp.groups()[0]] = obj
283 if collectionUUID in obj:
284 uuids[obj[collectionUUID]] = obj
286 def collect_uploads(obj):
287 loc = obj.get("location", "")
291 if sp[0] in ("file", "http", "https"):
292 # Record local files than need to be uploaded,
293 # don't include file literals, keep references, etc.
297 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
298 visit_class(sc_result, ("File", "Directory"), collect_uploads)
300 # Resolve any collection uuids we found to portable data hashes
301 # and assign them to uuid_map
303 fetch_uuids = list(uuids.keys())
305 # For a large number of fetch_uuids, API server may limit
306 # response size, so keep fetching from API server has nothing
308 lookups = arvrunner.api.collections().list(
309 filters=[["uuid", "in", fetch_uuids]],
311 select=["uuid", "portable_data_hash"]).execute(
312 num_retries=arvrunner.num_retries)
314 if not lookups["items"]:
317 for l in lookups["items"]:
318 uuid_map[l["uuid"]] = l["portable_data_hash"]
320 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
322 normalizeFilesDirs(sc)
324 if include_primary and "id" in workflowobj:
325 sc.append({"class": "File", "location": workflowobj["id"]})
327 if "$schemas" in workflowobj:
328 for s in workflowobj["$schemas"]:
329 sc.append({"class": "File", "location": s})
331 def visit_default(obj):
333 def ensure_default_location(f):
334 if "location" not in f and "path" in f:
335 f["location"] = f["path"]
337 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
338 # Doesn't exist, remove from list of dependencies to upload
339 sc[:] = [x for x in sc if x["location"] != f["location"]]
340 # Delete "default" from workflowobj
342 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
346 find_defaults(workflowobj, visit_default)
349 def discover_default_secondary_files(obj):
350 builder_job_order = {}
351 for t in obj["inputs"]:
352 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
353 # Need to create a builder object to evaluate expressions.
354 builder = make_builder(builder_job_order,
355 obj.get("hints", []),
356 obj.get("requirements", []),
358 discover_secondary_files(arvrunner.fs_access,
364 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
365 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
367 for d in list(discovered):
368 # Only interested in discovered secondaryFiles which are local
369 # files that need to be uploaded.
370 if d.startswith("file:"):
371 sc.extend(discovered[d])
375 mapper = ArvPathMapper(arvrunner, sc, "",
379 single_collection=True)
382 loc = p.get("location")
383 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
384 p["location"] = mapper.mapper(p["location"]).resolved
390 if collectionUUID in p:
391 uuid = p[collectionUUID]
392 if uuid not in uuid_map:
393 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
394 "Collection uuid %s not found" % uuid)
395 gp = collection_pdh_pattern.match(loc)
396 if gp and uuid_map[uuid] != gp.groups()[0]:
397 # This file entry has both collectionUUID and a PDH
398 # location. If the PDH doesn't match the one returned
399 # the API server, raise an error.
400 raise SourceLine(p, "location", validate.ValidationException).makeError(
401 "Expected collection uuid %s to be %s but API server reported %s" % (
402 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
404 gp = collection_uuid_pattern.match(loc)
407 uuid = gp.groups()[0]
408 if uuid not in uuid_map:
409 raise SourceLine(p, "location", validate.ValidationException).makeError(
410 "Collection uuid %s not found" % uuid)
411 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
412 p[collectionUUID] = uuid
414 visit_class(workflowobj, ("File", "Directory"), setloc)
415 visit_class(discovered, ("File", "Directory"), setloc)
417 if discovered_secondaryfiles is not None:
419 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
421 if "$schemas" in workflowobj:
423 for s in workflowobj["$schemas"]:
424 sch.append(mapper.mapper(s).resolved)
425 workflowobj["$schemas"] = sch
430 def upload_docker(arvrunner, tool):
431 """Uploads Docker images used in CommandLineTool objects."""
433 if isinstance(tool, CommandLineTool):
434 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
436 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
437 # TODO: can be supported by containers API, but not jobs API.
438 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
439 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
440 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
442 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
443 elif isinstance(tool, cwltool.workflow.Workflow):
445 upload_docker(arvrunner, s.embedded_tool)
448 def packed_workflow(arvrunner, tool, merged_map):
449 """Create a packed workflow.
451 A "packed" workflow is one where all the components have been combined into a single document."""
454 packed = pack(arvrunner.loadingContext, tool.tool["id"],
455 rewrite_out=rewrites,
456 loader=tool.doc_loader)
458 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
460 def visit(v, cur_id):
461 if isinstance(v, dict):
462 if v.get("class") in ("CommandLineTool", "Workflow"):
464 cur_id = rewrite_to_orig.get(v["id"], v["id"])
465 if "path" in v and "location" not in v:
466 v["location"] = v["path"]
468 if "location" in v and not v["location"].startswith("keep:"):
469 v["location"] = merged_map[cur_id].resolved[v["location"]]
470 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
471 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
472 if v.get("class") == "DockerRequirement":
473 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
476 if isinstance(v, list):
483 def tag_git_version(packed):
484 if tool.tool["id"].startswith("file://"):
485 path = os.path.dirname(tool.tool["id"][7:])
487 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
488 except (OSError, subprocess.CalledProcessError):
491 packed["http://schema.org/version"] = githash
494 def upload_job_order(arvrunner, name, tool, job_order):
495 """Upload local files referenced in the input object and return updated input
496 object with 'location' updated to the proper keep references.
499 # Make a copy of the job order and set defaults.
500 builder_job_order = copy.copy(job_order)
502 # fill_in_defaults throws an error if there are any
503 # missing required parameters, we don't want it to do that
504 # so make them all optional.
505 inputs_copy = copy.deepcopy(tool.tool["inputs"])
506 for i in inputs_copy:
507 if "null" not in i["type"]:
508 i["type"] = ["null"] + aslist(i["type"])
510 fill_in_defaults(inputs_copy,
513 # Need to create a builder object to evaluate expressions.
514 builder = make_builder(builder_job_order,
518 # Now update job_order with secondaryFiles
519 discover_secondary_files(arvrunner.fs_access,
524 jobmapper = upload_dependencies(arvrunner,
528 job_order.get("id", "#"),
531 if "id" in job_order:
534 # Need to filter this out, gets added by cwltool when providing
535 # parameters on the command line.
536 if "job_order" in job_order:
537 del job_order["job_order"]
541 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
543 def upload_workflow_deps(arvrunner, tool):
544 # Ensure that Docker images needed by this workflow are available
546 upload_docker(arvrunner, tool)
548 document_loader = tool.doc_loader
552 def upload_tool_deps(deptool):
554 discovered_secondaryfiles = {}
555 pm = upload_dependencies(arvrunner,
556 "%s dependencies" % (shortname(deptool["id"])),
561 include_primary=False,
562 discovered_secondaryfiles=discovered_secondaryfiles)
563 document_loader.idx[deptool["id"]] = deptool
565 for k,v in pm.items():
566 toolmap[k] = v.resolved
567 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
569 tool.visit(upload_tool_deps)
573 def arvados_jobs_image(arvrunner, img):
574 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
577 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
578 except Exception as e:
579 raise Exception("Docker image %s is not available\n%s" % (img, e) )
582 def upload_workflow_collection(arvrunner, name, packed):
583 collection = arvados.collection.Collection(api_client=arvrunner.api,
584 keep_client=arvrunner.keep_client,
585 num_retries=arvrunner.num_retries)
586 with collection.open("workflow.cwl", "w") as f:
587 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
589 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
590 ["name", "like", name+"%"]]
591 if arvrunner.project_uuid:
592 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
593 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
596 logger.info("Using collection %s", exists["items"][0]["uuid"])
598 collection.save_new(name=name,
599 owner_uuid=arvrunner.project_uuid,
600 ensure_unique_name=True,
601 num_retries=arvrunner.num_retries)
602 logger.info("Uploaded to %s", collection.manifest_locator())
604 return collection.portable_data_hash()
607 class Runner(Process):
608 """Base class for runner processes, which submit an instance of
609 arvados-cwl-runner and wait for the final result."""
611 def __init__(self, runner, updated_tool,
612 tool, loadingContext, enable_reuse,
613 output_name, output_tags, submit_runner_ram=0,
614 name=None, on_error=None, submit_runner_image=None,
615 intermediate_output_ttl=0, merged_map=None,
616 priority=None, secret_store=None,
617 collection_cache_size=256,
618 collection_cache_is_default=True):
620 loadingContext = loadingContext.copy()
621 loadingContext.metadata = updated_tool.metadata.copy()
623 super(Runner, self).__init__(updated_tool.tool, loadingContext)
625 self.arvrunner = runner
626 self.embedded_tool = tool
627 self.job_order = None
630 # If reuse is permitted by command line arguments but
631 # disabled by the workflow itself, disable it.
632 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
634 enable_reuse = reuse_req["enableReuse"]
635 self.enable_reuse = enable_reuse
637 self.final_output = None
638 self.output_name = output_name
639 self.output_tags = output_tags
641 self.on_error = on_error
642 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
643 self.intermediate_output_ttl = intermediate_output_ttl
644 self.priority = priority
645 self.secret_store = secret_store
646 self.enable_dev = loadingContext.enable_dev
648 self.submit_runner_cores = 1
649 self.submit_runner_ram = 1024 # defaut 1 GiB
650 self.collection_cache_size = collection_cache_size
652 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
653 if runner_resource_req:
654 if runner_resource_req.get("coresMin"):
655 self.submit_runner_cores = runner_resource_req["coresMin"]
656 if runner_resource_req.get("ramMin"):
657 self.submit_runner_ram = runner_resource_req["ramMin"]
658 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
659 self.collection_cache_size = runner_resource_req["keep_cache"]
661 if submit_runner_ram:
662 # Command line / initializer overrides default and/or spec from workflow
663 self.submit_runner_ram = submit_runner_ram
665 if self.submit_runner_ram <= 0:
666 raise Exception("Value of submit-runner-ram must be greater than zero")
668 if self.submit_runner_cores <= 0:
669 raise Exception("Value of submit-runner-cores must be greater than zero")
671 self.merged_map = merged_map or {}
674 job_order, # type: Mapping[Text, Text]
675 output_callbacks, # type: Callable[[Any, Any], Any]
676 runtimeContext # type: RuntimeContext
677 ): # type: (...) -> Generator[Any, None, None]
678 self.job_order = job_order
679 self._init_job(job_order, runtimeContext)
682 def update_pipeline_component(self, record):
685 def done(self, record):
686 """Base method for handling a completed runner."""
689 if record["state"] == "Complete":
690 if record.get("exit_code") is not None:
691 if record["exit_code"] == 33:
692 processStatus = "UnsupportedRequirement"
693 elif record["exit_code"] == 0:
694 processStatus = "success"
696 processStatus = "permanentFail"
698 processStatus = "success"
700 processStatus = "permanentFail"
704 if processStatus == "permanentFail":
705 logc = arvados.collection.CollectionReader(record["log"],
706 api_client=self.arvrunner.api,
707 keep_client=self.arvrunner.keep_client,
708 num_retries=self.arvrunner.num_retries)
709 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
711 self.final_output = record["output"]
712 outc = arvados.collection.CollectionReader(self.final_output,
713 api_client=self.arvrunner.api,
714 keep_client=self.arvrunner.keep_client,
715 num_retries=self.arvrunner.num_retries)
716 if "cwl.output.json" in outc:
717 with outc.open("cwl.output.json", "rb") as f:
719 outputs = json.loads(f.read().decode())
720 def keepify(fileobj):
721 path = fileobj["location"]
722 if not path.startswith("keep:"):
723 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
724 adjustFileObjs(outputs, keepify)
725 adjustDirObjs(outputs, keepify)
727 logger.exception("[%s] While getting final output object", self.name)
728 self.arvrunner.output_callback({}, "permanentFail")
730 self.arvrunner.output_callback(outputs, processStatus)