1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
53 logger = logging.getLogger('arvados.cwl-runner')
55 def trim_anonymous_location(obj):
56 """Remove 'location' field from File and Directory literals.
58 To make internal handling easier, literals are assigned a random id for
59 'location'. However, when writing the record back out, this can break
60 reproducibility. Since it is valid for literals not have a 'location'
65 if obj.get("location", "").startswith("_:"):
69 def remove_redundant_fields(obj):
70 for field in ("path", "nameext", "nameroot", "dirname"):
75 def find_defaults(d, op):
76 if isinstance(d, list):
79 elif isinstance(d, dict):
83 for i in viewvalues(d):
86 def make_builder(joborder, hints, requirements, runtimeContext):
89 files=[], # type: List[Dict[Text, Text]]
90 bindings=[], # type: List[Dict[Text, Any]]
91 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
92 names=None, # type: Names
93 requirements=requirements, # type: List[Dict[Text, Any]]
94 hints=hints, # type: List[Dict[Text, Any]]
95 resources={}, # type: Dict[str, int]
96 mutation_manager=None, # type: Optional[MutationManager]
97 formatgraph=None, # type: Optional[Graph]
98 make_fs_access=None, # type: Type[StdFsAccess]
99 fs_access=None, # type: StdFsAccess
100 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101 timeout=runtimeContext.eval_timeout, # type: float
102 debug=runtimeContext.debug, # type: bool
103 js_console=runtimeContext.js_console, # type: bool
104 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
105 loadListing="", # type: Text
106 outdir="", # type: Text
107 tmpdir="", # type: Text
108 stagedir="", # type: Text
111 def search_schemadef(name, reqs):
113 if r["class"] == "SchemaDefRequirement":
114 for sd in r["types"]:
115 if sd["name"] == name:
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120 "float", "double", "string", "record",
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125 # union type, collect all possible secondaryFiles
126 for i in inputschema:
127 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130 if isinstance(inputschema, basestring):
131 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
137 if "secondaryFiles" in inputschema:
138 # set secondaryFiles, may be inherited by compound types.
139 secondaryspec = inputschema["secondaryFiles"]
141 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142 not isinstance(inputschema["type"], basestring)):
143 # compound type (union, array, record)
144 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146 elif (inputschema["type"] == "record" and
147 isinstance(primary, Mapping)):
149 # record type, find secondary files associated with fields.
151 for f in inputschema["fields"]:
152 p = primary.get(shortname(f["name"]))
154 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156 elif (inputschema["type"] == "array" and
157 isinstance(primary, Sequence)):
159 # array type, find secondary files of elements
162 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164 elif (inputschema["type"] == "File" and
166 isinstance(primary, Mapping) and
167 primary.get("class") == "File" and
168 "secondaryFiles" not in primary):
170 # Found a file, check for secondaryFiles
173 primary["secondaryFiles"] = secondaryspec
174 for i, sf in enumerate(aslist(secondaryspec)):
175 pattern = builder.do_eval(sf["pattern"], context=primary)
178 if isinstance(pattern, list):
179 specs.extend(pattern)
180 elif isinstance(pattern, dict):
181 specs.append(pattern)
182 elif isinstance(pattern, str):
183 specs.append({"pattern": pattern})
185 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
186 "Expression must return list, object, string or null")
189 for i, sf in enumerate(specs):
190 if isinstance(sf, dict):
191 if sf.get("class") == "File":
192 pattern = sf["basename"]
194 pattern = sf["pattern"]
195 required = sf.get("required")
196 elif isinstance(sf, str):
200 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
201 "Expression must return list, object, string or null")
203 sfpath = substitute(primary["location"], pattern)
204 required = builder.do_eval(required, context=primary)
206 if fsaccess.exists(sfpath):
207 found.append({"location": sfpath, "class": "File"})
209 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
210 "Required secondary file '%s' does not exist" % sfpath)
212 primary["secondaryFiles"] = cmap(found)
213 if discovered is not None:
214 discovered[primary["location"]] = primary["secondaryFiles"]
215 elif inputschema["type"] not in primitive_types_set:
216 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
218 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
219 for inputschema in inputs:
220 primary = job_order.get(shortname(inputschema["id"]))
221 if isinstance(primary, (Mapping, Sequence)):
222 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
224 def upload_dependencies(arvrunner, name, document_loader,
225 workflowobj, uri, loadref_run,
226 include_primary=True, discovered_secondaryfiles=None):
227 """Upload the dependencies of the workflowobj document to Keep.
229 Returns a pathmapper object mapping local paths to keep references. Also
230 does an in-place update of references in "workflowobj".
232 Use scandeps to find $import, $include, $schemas, run, File and Directory
233 fields that represent external references.
235 If workflowobj has an "id" field, this will reload the document to ensure
236 it is scanning the raw document prior to preprocessing.
241 joined = document_loader.fetcher.urljoin(b, u)
242 defrg, _ = urllib.parse.urldefrag(joined)
243 if defrg not in loaded:
245 # Use fetch_text to get raw file (before preprocessing).
246 text = document_loader.fetch_text(defrg)
247 if isinstance(text, bytes):
248 textIO = StringIO(text.decode('utf-8'))
250 textIO = StringIO(text)
251 return yaml.safe_load(textIO)
256 loadref_fields = set(("$import", "run"))
258 loadref_fields = set(("$import",))
260 scanobj = workflowobj
261 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
262 # Need raw file content (before preprocessing) to ensure
263 # that external references in $include and $mixin are captured.
264 scanobj = loadref("", workflowobj["id"])
266 sc_result = scandeps(uri, scanobj,
268 set(("$include", "$schemas", "location")),
269 loadref, urljoin=document_loader.fetcher.urljoin)
274 def collect_uuids(obj):
275 loc = obj.get("location", "")
278 # Collect collection uuids that need to be resolved to
279 # portable data hashes
280 gp = collection_uuid_pattern.match(loc)
282 uuids[gp.groups()[0]] = obj
283 if collectionUUID in obj:
284 uuids[obj[collectionUUID]] = obj
286 def collect_uploads(obj):
287 loc = obj.get("location", "")
291 if sp[0] in ("file", "http", "https"):
292 # Record local files than need to be uploaded,
293 # don't include file literals, keep references, etc.
297 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
298 visit_class(sc_result, ("File", "Directory"), collect_uploads)
300 # Resolve any collection uuids we found to portable data hashes
301 # and assign them to uuid_map
303 fetch_uuids = list(uuids.keys())
305 # For a large number of fetch_uuids, API server may limit
306 # response size, so keep fetching from API server has nothing
308 lookups = arvrunner.api.collections().list(
309 filters=[["uuid", "in", fetch_uuids]],
311 select=["uuid", "portable_data_hash"]).execute(
312 num_retries=arvrunner.num_retries)
314 if not lookups["items"]:
317 for l in lookups["items"]:
318 uuid_map[l["uuid"]] = l["portable_data_hash"]
320 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
322 normalizeFilesDirs(sc)
324 if include_primary and "id" in workflowobj:
325 sc.append({"class": "File", "location": workflowobj["id"]})
327 if "$schemas" in workflowobj:
328 for s in workflowobj["$schemas"]:
329 sc.append({"class": "File", "location": s})
331 def visit_default(obj):
333 def ensure_default_location(f):
334 if "location" not in f and "path" in f:
335 f["location"] = f["path"]
337 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
338 # Doesn't exist, remove from list of dependencies to upload
339 sc[:] = [x for x in sc if x["location"] != f["location"]]
340 # Delete "default" from workflowobj
342 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
346 find_defaults(workflowobj, visit_default)
349 def discover_default_secondary_files(obj):
350 builder_job_order = {}
351 for t in obj["inputs"]:
352 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
353 # Need to create a builder object to evaluate expressions.
354 builder = make_builder(builder_job_order,
355 obj.get("hints", []),
356 obj.get("requirements", []),
358 discover_secondary_files(arvrunner.fs_access,
364 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
365 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
367 for d in list(discovered):
368 # Only interested in discovered secondaryFiles which are local
369 # files that need to be uploaded.
370 if d.startswith("file:"):
371 sc.extend(discovered[d])
375 mapper = ArvPathMapper(arvrunner, sc, "",
379 single_collection=True)
382 loc = p.get("location")
383 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
384 p["location"] = mapper.mapper(p["location"]).resolved
390 if collectionUUID in p:
391 uuid = p[collectionUUID]
392 if uuid not in uuid_map:
393 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
394 "Collection uuid %s not found" % uuid)
395 gp = collection_pdh_pattern.match(loc)
396 if gp and uuid_map[uuid] != gp.groups()[0]:
397 # This file entry has both collectionUUID and a PDH
398 # location. If the PDH doesn't match the one returned
399 # the API server, raise an error.
400 raise SourceLine(p, "location", validate.ValidationException).makeError(
401 "Expected collection uuid %s to be %s but API server reported %s" % (
402 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
404 gp = collection_uuid_pattern.match(loc)
407 uuid = gp.groups()[0]
408 if uuid not in uuid_map:
409 raise SourceLine(p, "location", validate.ValidationException).makeError(
410 "Collection uuid %s not found" % uuid)
411 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
412 p[collectionUUID] = uuid
414 visit_class(workflowobj, ("File", "Directory"), setloc)
415 visit_class(discovered, ("File", "Directory"), setloc)
417 if discovered_secondaryfiles is not None:
419 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
421 if "$schemas" in workflowobj:
423 for s in workflowobj["$schemas"]:
424 sch.append(mapper.mapper(s).resolved)
425 workflowobj["$schemas"] = sch
430 def upload_docker(arvrunner, tool):
431 """Uploads Docker images used in CommandLineTool objects."""
433 if isinstance(tool, CommandLineTool):
434 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
436 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
437 # TODO: can be supported by containers API, but not jobs API.
438 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
439 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
440 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
442 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
443 elif isinstance(tool, cwltool.workflow.Workflow):
445 upload_docker(arvrunner, s.embedded_tool)
448 def packed_workflow(arvrunner, tool, merged_map):
449 """Create a packed workflow.
451 A "packed" workflow is one where all the components have been combined into a single document."""
454 packed = pack(arvrunner.loadingContext, tool.tool["id"],
455 rewrite_out=rewrites,
456 loader=tool.doc_loader)
458 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
460 def visit(v, cur_id):
461 if isinstance(v, dict):
462 if v.get("class") in ("CommandLineTool", "Workflow"):
463 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
464 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
466 cur_id = rewrite_to_orig.get(v["id"], v["id"])
467 if "path" in v and "location" not in v:
468 v["location"] = v["path"]
470 if "location" in v and not v["location"].startswith("keep:"):
471 v["location"] = merged_map[cur_id].resolved[v["location"]]
472 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
473 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
474 if v.get("class") == "DockerRequirement":
475 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
478 if isinstance(v, list):
485 def tag_git_version(packed):
486 if tool.tool["id"].startswith("file://"):
487 path = os.path.dirname(tool.tool["id"][7:])
489 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
490 except (OSError, subprocess.CalledProcessError):
493 packed["http://schema.org/version"] = githash
496 def upload_job_order(arvrunner, name, tool, job_order):
497 """Upload local files referenced in the input object and return updated input
498 object with 'location' updated to the proper keep references.
501 # Make a copy of the job order and set defaults.
502 builder_job_order = copy.copy(job_order)
504 # fill_in_defaults throws an error if there are any
505 # missing required parameters, we don't want it to do that
506 # so make them all optional.
507 inputs_copy = copy.deepcopy(tool.tool["inputs"])
508 for i in inputs_copy:
509 if "null" not in i["type"]:
510 i["type"] = ["null"] + aslist(i["type"])
512 fill_in_defaults(inputs_copy,
515 # Need to create a builder object to evaluate expressions.
516 builder = make_builder(builder_job_order,
520 # Now update job_order with secondaryFiles
521 discover_secondary_files(arvrunner.fs_access,
526 jobmapper = upload_dependencies(arvrunner,
530 job_order.get("id", "#"),
533 if "id" in job_order:
536 # Need to filter this out, gets added by cwltool when providing
537 # parameters on the command line.
538 if "job_order" in job_order:
539 del job_order["job_order"]
543 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
545 def upload_workflow_deps(arvrunner, tool):
546 # Ensure that Docker images needed by this workflow are available
548 upload_docker(arvrunner, tool)
550 document_loader = tool.doc_loader
554 def upload_tool_deps(deptool):
556 discovered_secondaryfiles = {}
557 pm = upload_dependencies(arvrunner,
558 "%s dependencies" % (shortname(deptool["id"])),
563 include_primary=False,
564 discovered_secondaryfiles=discovered_secondaryfiles)
565 document_loader.idx[deptool["id"]] = deptool
567 for k,v in pm.items():
568 toolmap[k] = v.resolved
569 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
571 tool.visit(upload_tool_deps)
575 def arvados_jobs_image(arvrunner, img):
576 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
579 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
580 except Exception as e:
581 raise Exception("Docker image %s is not available\n%s" % (img, e) )
584 def upload_workflow_collection(arvrunner, name, packed):
585 collection = arvados.collection.Collection(api_client=arvrunner.api,
586 keep_client=arvrunner.keep_client,
587 num_retries=arvrunner.num_retries)
588 with collection.open("workflow.cwl", "w") as f:
589 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
591 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
592 ["name", "like", name+"%"]]
593 if arvrunner.project_uuid:
594 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
595 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
598 logger.info("Using collection %s", exists["items"][0]["uuid"])
600 collection.save_new(name=name,
601 owner_uuid=arvrunner.project_uuid,
602 ensure_unique_name=True,
603 num_retries=arvrunner.num_retries)
604 logger.info("Uploaded to %s", collection.manifest_locator())
606 return collection.portable_data_hash()
609 class Runner(Process):
610 """Base class for runner processes, which submit an instance of
611 arvados-cwl-runner and wait for the final result."""
613 def __init__(self, runner, updated_tool,
614 tool, loadingContext, enable_reuse,
615 output_name, output_tags, submit_runner_ram=0,
616 name=None, on_error=None, submit_runner_image=None,
617 intermediate_output_ttl=0, merged_map=None,
618 priority=None, secret_store=None,
619 collection_cache_size=256,
620 collection_cache_is_default=True):
622 loadingContext = loadingContext.copy()
623 loadingContext.metadata = updated_tool.metadata.copy()
625 super(Runner, self).__init__(updated_tool.tool, loadingContext)
627 self.arvrunner = runner
628 self.embedded_tool = tool
629 self.job_order = None
632 # If reuse is permitted by command line arguments but
633 # disabled by the workflow itself, disable it.
634 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
636 enable_reuse = reuse_req["enableReuse"]
637 self.enable_reuse = enable_reuse
639 self.final_output = None
640 self.output_name = output_name
641 self.output_tags = output_tags
643 self.on_error = on_error
644 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
645 self.intermediate_output_ttl = intermediate_output_ttl
646 self.priority = priority
647 self.secret_store = secret_store
648 self.enable_dev = loadingContext.enable_dev
650 self.submit_runner_cores = 1
651 self.submit_runner_ram = 1024 # defaut 1 GiB
652 self.collection_cache_size = collection_cache_size
654 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
655 if runner_resource_req:
656 if runner_resource_req.get("coresMin"):
657 self.submit_runner_cores = runner_resource_req["coresMin"]
658 if runner_resource_req.get("ramMin"):
659 self.submit_runner_ram = runner_resource_req["ramMin"]
660 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
661 self.collection_cache_size = runner_resource_req["keep_cache"]
663 if submit_runner_ram:
664 # Command line / initializer overrides default and/or spec from workflow
665 self.submit_runner_ram = submit_runner_ram
667 if self.submit_runner_ram <= 0:
668 raise Exception("Value of submit-runner-ram must be greater than zero")
670 if self.submit_runner_cores <= 0:
671 raise Exception("Value of submit-runner-cores must be greater than zero")
673 self.merged_map = merged_map or {}
676 job_order, # type: Mapping[Text, Text]
677 output_callbacks, # type: Callable[[Any, Any], Any]
678 runtimeContext # type: RuntimeContext
679 ): # type: (...) -> Generator[Any, None, None]
680 self.job_order = job_order
681 self._init_job(job_order, runtimeContext)
684 def update_pipeline_component(self, record):
687 def done(self, record):
688 """Base method for handling a completed runner."""
691 if record["state"] == "Complete":
692 if record.get("exit_code") is not None:
693 if record["exit_code"] == 33:
694 processStatus = "UnsupportedRequirement"
695 elif record["exit_code"] == 0:
696 processStatus = "success"
698 processStatus = "permanentFail"
700 processStatus = "success"
702 processStatus = "permanentFail"
706 if processStatus == "permanentFail":
707 logc = arvados.collection.CollectionReader(record["log"],
708 api_client=self.arvrunner.api,
709 keep_client=self.arvrunner.keep_client,
710 num_retries=self.arvrunner.num_retries)
711 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
713 self.final_output = record["output"]
714 outc = arvados.collection.CollectionReader(self.final_output,
715 api_client=self.arvrunner.api,
716 keep_client=self.arvrunner.keep_client,
717 num_retries=self.arvrunner.num_retries)
718 if "cwl.output.json" in outc:
719 with outc.open("cwl.output.json", "rb") as f:
721 outputs = json.loads(f.read().decode())
722 def keepify(fileobj):
723 path = fileobj["location"]
724 if not path.startswith("keep:"):
725 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
726 adjustFileObjs(outputs, keepify)
727 adjustDirObjs(outputs, keepify)
729 logger.exception("[%s] While getting final output object", self.name)
730 self.arvrunner.output_callback({}, "permanentFail")
732 self.arvrunner.output_callback(outputs, processStatus)