1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
45 if os.name == "posix" and sys.version_info[0] < 3:
46 import subprocess32 as subprocess
50 from schema_salad.sourceline import SourceLine, cmap
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55 shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
65 import arvados.collection
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
73 from ._version import __version__
75 from . context import ArvRuntimeContext
76 from .perf import Perf
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
81 def trim_anonymous_location(obj):
82 """Remove 'location' field from File and Directory literals.
84 To make internal handling easier, literals are assigned a random id for
85 'location'. However, when writing the record back out, this can break
86 reproducibility. Since it is valid for literals not have a 'location'
91 if obj.get("location", "").startswith("_:"):
95 def remove_redundant_fields(obj):
96 for field in ("path", "nameext", "nameroot", "dirname"):
101 def find_defaults(d, op):
102 if isinstance(d, list):
105 elif isinstance(d, dict):
109 for i in viewvalues(d):
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
115 files=[], # type: List[Dict[Text, Text]]
116 bindings=[], # type: List[Dict[Text, Any]]
117 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
118 names=None, # type: Names
119 requirements=requirements, # type: List[Dict[Text, Any]]
120 hints=hints, # type: List[Dict[Text, Any]]
121 resources={}, # type: Dict[str, int]
122 mutation_manager=None, # type: Optional[MutationManager]
123 formatgraph=None, # type: Optional[Graph]
124 make_fs_access=None, # type: Type[StdFsAccess]
125 fs_access=None, # type: StdFsAccess
126 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127 timeout=runtimeContext.eval_timeout, # type: float
128 debug=runtimeContext.debug, # type: bool
129 js_console=runtimeContext.js_console, # type: bool
130 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
131 loadListing="", # type: Text
132 outdir="", # type: Text
133 tmpdir="", # type: Text
134 stagedir="", # type: Text
135 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136 container_engine="docker"
139 def search_schemadef(name, reqs):
141 if r["class"] == "SchemaDefRequirement":
142 for sd in r["types"]:
143 if sd["name"] == name:
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148 "float", "double", "string", "record",
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153 # union type, collect all possible secondaryFiles
154 for i in inputschema:
155 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
158 if inputschema == "File":
159 inputschema = {"type": "File"}
161 if isinstance(inputschema, basestring):
162 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
168 if "secondaryFiles" in inputschema:
169 # set secondaryFiles, may be inherited by compound types.
170 secondaryspec = inputschema["secondaryFiles"]
172 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173 not isinstance(inputschema["type"], basestring)):
174 # compound type (union, array, record)
175 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
177 elif (inputschema["type"] == "record" and
178 isinstance(primary, Mapping)):
180 # record type, find secondary files associated with fields.
182 for f in inputschema["fields"]:
183 p = primary.get(shortname(f["name"]))
185 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
187 elif (inputschema["type"] == "array" and
188 isinstance(primary, Sequence)):
190 # array type, find secondary files of elements
193 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
195 elif (inputschema["type"] == "File" and
196 isinstance(primary, Mapping) and
197 primary.get("class") == "File"):
199 if "secondaryFiles" in primary or not secondaryspec:
204 # Found a file, check for secondaryFiles
207 primary["secondaryFiles"] = secondaryspec
208 for i, sf in enumerate(aslist(secondaryspec)):
209 if builder.cwlVersion == "v1.0":
212 pattern = sf["pattern"]
215 if isinstance(pattern, list):
216 specs.extend(pattern)
217 elif isinstance(pattern, dict):
218 specs.append(pattern)
219 elif isinstance(pattern, str):
220 if builder.cwlVersion == "v1.0":
221 specs.append({"pattern": pattern, "required": True})
223 specs.append({"pattern": pattern, "required": sf.get("required")})
225 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226 "Expression must return list, object, string or null")
229 for i, sf in enumerate(specs):
230 if isinstance(sf, dict):
231 if sf.get("class") == "File":
233 if sf.get("location") is None:
234 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235 "File object is missing 'location': %s" % sf)
236 sfpath = sf["location"]
239 pattern = sf["pattern"]
240 required = sf.get("required")
241 elif isinstance(sf, str):
245 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246 "Expression must return list, object, string or null")
248 if pattern is not None:
249 if "${" in pattern or "$(" in pattern:
250 sfname = builder.do_eval(pattern, context=primary)
252 sfname = substitute(primary["basename"], pattern)
257 if isinstance(sfname, str):
258 p_location = primary["location"]
259 if "/" in p_location:
261 p_location[0 : p_location.rindex("/") + 1]
265 required = builder.do_eval(required, context=primary)
267 if isinstance(sfname, list) or isinstance(sfname, dict):
268 each = aslist(sfname)
270 if required and not fsaccess.exists(e.get("location")):
271 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272 "Required secondary file '%s' does not exist" % e.get("location"))
275 if isinstance(sfname, str):
276 if fsaccess.exists(sfpath):
277 if pattern is not None:
278 found.append({"location": sfpath, "class": "File"})
282 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283 "Required secondary file '%s' does not exist" % sfpath)
285 primary["secondaryFiles"] = cmap(found)
286 if discovered is not None:
287 discovered[primary["location"]] = primary["secondaryFiles"]
288 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292 for inputschema in inputs:
293 primary = job_order.get(shortname(inputschema["id"]))
294 if isinstance(primary, (Mapping, Sequence)):
295 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
297 def upload_dependencies(arvrunner, name, document_loader,
298 workflowobj, uri, loadref_run, runtimeContext,
299 include_primary=True, discovered_secondaryfiles=None,
301 """Upload the dependencies of the workflowobj document to Keep.
303 Returns a pathmapper object mapping local paths to keep references. Also
304 does an in-place update of references in "workflowobj".
306 Use scandeps to find $import, $include, $schemas, run, File and Directory
307 fields that represent external references.
309 If workflowobj has an "id" field, this will reload the document to ensure
310 it is scanning the raw document prior to preprocessing.
315 joined = document_loader.fetcher.urljoin(b, u)
316 defrg, _ = urllib.parse.urldefrag(joined)
317 if defrg not in loaded:
319 if cache is not None and defrg in cache:
321 # Use fetch_text to get raw file (before preprocessing).
322 text = document_loader.fetch_text(defrg)
323 if isinstance(text, bytes):
324 textIO = StringIO(text.decode('utf-8'))
326 textIO = StringIO(text)
327 yamlloader = YAML(typ='safe', pure=True)
328 result = yamlloader.load(textIO)
329 if cache is not None:
330 cache[defrg] = result
336 loadref_fields = set(("$import", "run"))
338 loadref_fields = set(("$import",))
340 scanobj = workflowobj
341 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
342 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
343 if cache is not None and defrg not in cache:
344 # if we haven't seen this file before, want raw file
345 # content (before preprocessing) to ensure that external
346 # references like $include haven't already been inlined.
347 scanobj = loadref("", workflowobj["id"])
351 with Perf(metrics, "scandeps include, location"):
352 sc_result = scandeps(uri, scanobj,
354 set(("$include", "location")),
355 loadref, urljoin=document_loader.fetcher.urljoin,
358 with Perf(metrics, "scandeps $schemas"):
359 optional_deps = scandeps(uri, scanobj,
362 loadref, urljoin=document_loader.fetcher.urljoin,
365 if sc_result is None:
368 if optional_deps is None:
372 sc_result.extend(optional_deps)
377 def collect_uuids(obj):
378 loc = obj.get("location", "")
381 # Collect collection uuids that need to be resolved to
382 # portable data hashes
383 gp = collection_uuid_pattern.match(loc)
385 uuids[gp.groups()[0]] = obj
386 if collectionUUID in obj:
387 uuids[obj[collectionUUID]] = obj
389 def collect_uploads(obj):
390 loc = obj.get("location", "")
394 if sp[0] in ("file", "http", "https"):
395 # Record local files than need to be uploaded,
396 # don't include file literals, keep references, etc.
400 with Perf(metrics, "collect uuids"):
401 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
403 with Perf(metrics, "collect uploads"):
404 visit_class(sc_result, ("File", "Directory"), collect_uploads)
406 # Resolve any collection uuids we found to portable data hashes
407 # and assign them to uuid_map
409 fetch_uuids = list(uuids.keys())
410 with Perf(metrics, "fetch_uuids"):
412 # For a large number of fetch_uuids, API server may limit
413 # response size, so keep fetching from API server has nothing
415 lookups = arvrunner.api.collections().list(
416 filters=[["uuid", "in", fetch_uuids]],
418 select=["uuid", "portable_data_hash"]).execute(
419 num_retries=arvrunner.num_retries)
421 if not lookups["items"]:
424 for l in lookups["items"]:
425 uuid_map[l["uuid"]] = l["portable_data_hash"]
427 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
429 normalizeFilesDirs(sc)
431 if "id" in workflowobj:
432 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
434 # make sure it's included
435 sc.append({"class": "File", "location": defrg})
437 # make sure it's excluded
438 sc = [d for d in sc if d.get("location") != defrg]
440 def visit_default(obj):
441 def defaults_are_optional(f):
442 if "location" not in f and "path" in f:
443 f["location"] = f["path"]
445 normalizeFilesDirs(f)
446 optional_deps.append(f)
447 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
449 find_defaults(workflowobj, visit_default)
452 def discover_default_secondary_files(obj):
453 builder_job_order = {}
454 for t in obj["inputs"]:
455 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
456 # Need to create a builder object to evaluate expressions.
457 builder = make_builder(builder_job_order,
458 obj.get("hints", []),
459 obj.get("requirements", []),
462 discover_secondary_files(arvrunner.fs_access,
468 _jobloaderctx = jobloaderctx.copy()
469 loader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=document_loader.fetcher_constructor)
471 copied, _ = loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
472 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
474 for d in list(discovered):
475 # Only interested in discovered secondaryFiles which are local
476 # files that need to be uploaded.
477 if d.startswith("file:"):
478 sc.extend(discovered[d])
482 with Perf(metrics, "mapper"):
483 mapper = ArvPathMapper(arvrunner, sc, "",
487 single_collection=True,
488 optional_deps=optional_deps)
492 if k.startswith("keep:"):
493 keeprefs.add(collection_pdh_pattern.match(k).group(1))
496 loc = p.get("location")
497 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
498 p["location"] = mapper.mapper(p["location"]).resolved
499 addkeepref(p["location"])
505 if collectionUUID in p:
506 uuid = p[collectionUUID]
507 if uuid not in uuid_map:
508 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
509 "Collection uuid %s not found" % uuid)
510 gp = collection_pdh_pattern.match(loc)
511 if gp and uuid_map[uuid] != gp.groups()[0]:
512 # This file entry has both collectionUUID and a PDH
513 # location. If the PDH doesn't match the one returned
514 # the API server, raise an error.
515 raise SourceLine(p, "location", validate.ValidationException).makeError(
516 "Expected collection uuid %s to be %s but API server reported %s" % (
517 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
519 gp = collection_uuid_pattern.match(loc)
521 # Not a uuid pattern (must be a pdh pattern)
522 addkeepref(p["location"])
525 uuid = gp.groups()[0]
526 if uuid not in uuid_map:
527 raise SourceLine(p, "location", validate.ValidationException).makeError(
528 "Collection uuid %s not found" % uuid)
529 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
530 p[collectionUUID] = uuid
532 with Perf(metrics, "setloc"):
533 visit_class(workflowobj, ("File", "Directory"), setloc)
534 visit_class(discovered, ("File", "Directory"), setloc)
536 if discovered_secondaryfiles is not None:
538 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
540 if runtimeContext.copy_deps:
541 # Find referenced collections and copy them into the
542 # destination project, for easy sharing.
543 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
544 filters=[["portable_data_hash", "in", list(keeprefs)],
545 ["owner_uuid", "=", runtimeContext.project_uuid]],
546 select=["uuid", "portable_data_hash", "created_at"]))
548 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
550 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
551 order="created_at desc",
552 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
554 if len(col["items"]) == 0:
555 logger.warning("Cannot find collection with portable data hash %s", kr)
557 col = col["items"][0]
559 arvrunner.api.collections().create(body={"collection": {
560 "owner_uuid": runtimeContext.project_uuid,
562 "description": col["description"],
563 "properties": col["properties"],
564 "portable_data_hash": col["portable_data_hash"],
565 "manifest_text": col["manifest_text"],
566 "storage_classes_desired": col["storage_classes_desired"],
567 "trash_at": col["trash_at"]
568 }}, ensure_unique_name=True).execute()
569 except Exception as e:
570 logger.warning("Unable copy collection to destination: %s", e)
572 if "$schemas" in workflowobj:
574 for s in workflowobj["$schemas"]:
576 sch.append(mapper.mapper(s).resolved)
577 workflowobj["$schemas"] = sch
582 def upload_docker(arvrunner, tool, runtimeContext):
583 """Uploads Docker images used in CommandLineTool objects."""
585 if isinstance(tool, CommandLineTool):
586 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
588 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
589 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
590 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
592 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
593 runtimeContext.project_uuid,
594 runtimeContext.force_docker_pull,
595 runtimeContext.tmp_outdir_prefix,
596 runtimeContext.match_local_docker,
597 runtimeContext.copy_deps)
599 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
601 runtimeContext.project_uuid,
602 runtimeContext.force_docker_pull,
603 runtimeContext.tmp_outdir_prefix,
604 runtimeContext.match_local_docker,
605 runtimeContext.copy_deps)
606 elif isinstance(tool, cwltool.workflow.Workflow):
608 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
611 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
612 """Create a packed workflow.
614 A "packed" workflow is one where all the components have been combined into a single document."""
617 packed = pack(arvrunner.loadingContext, tool.tool["id"],
618 rewrite_out=rewrites,
619 loader=tool.doc_loader)
621 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
623 def visit(v, cur_id):
624 if isinstance(v, dict):
625 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
626 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
627 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
629 cur_id = rewrite_to_orig.get(v["id"], v["id"])
630 if "path" in v and "location" not in v:
631 v["location"] = v["path"]
633 if "location" in v and cur_id in merged_map:
634 if v["location"] in merged_map[cur_id].resolved:
635 v["location"] = merged_map[cur_id].resolved[v["location"]]
636 if v["location"] in merged_map[cur_id].secondaryFiles:
637 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
638 if v.get("class") == "DockerRequirement":
639 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
640 runtimeContext.project_uuid,
641 runtimeContext.force_docker_pull,
642 runtimeContext.tmp_outdir_prefix,
643 runtimeContext.match_local_docker,
644 runtimeContext.copy_deps)
647 if isinstance(v, list):
654 packed[g] = git_info[g]
659 def tag_git_version(packed):
660 if tool.tool["id"].startswith("file://"):
661 path = os.path.dirname(tool.tool["id"][7:])
663 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
664 except (OSError, subprocess.CalledProcessError):
667 packed["http://schema.org/version"] = githash
670 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
671 """Upload local files referenced in the input object and return updated input
672 object with 'location' updated to the proper keep references.
675 # Make a copy of the job order and set defaults.
676 builder_job_order = copy.copy(job_order)
678 # fill_in_defaults throws an error if there are any
679 # missing required parameters, we don't want it to do that
680 # so make them all optional.
681 inputs_copy = copy.deepcopy(tool.tool["inputs"])
682 for i in inputs_copy:
683 if "null" not in i["type"]:
684 i["type"] = ["null"] + aslist(i["type"])
686 fill_in_defaults(inputs_copy,
689 # Need to create a builder object to evaluate expressions.
690 builder = make_builder(builder_job_order,
695 # Now update job_order with secondaryFiles
696 discover_secondary_files(arvrunner.fs_access,
701 jobmapper = upload_dependencies(arvrunner,
705 job_order.get("id", "#"),
709 if "id" in job_order:
712 # Need to filter this out, gets added by cwltool when providing
713 # parameters on the command line.
714 if "job_order" in job_order:
715 del job_order["job_order"]
719 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
721 def upload_workflow_deps(arvrunner, tool, runtimeContext):
722 # Ensure that Docker images needed by this workflow are available
724 with Perf(metrics, "upload_docker"):
725 upload_docker(arvrunner, tool, runtimeContext)
727 document_loader = tool.doc_loader
731 def upload_tool_deps(deptool):
733 discovered_secondaryfiles = {}
734 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
735 pm = upload_dependencies(arvrunner,
736 "%s dependencies" % (shortname(deptool["id"])),
742 include_primary=False,
743 discovered_secondaryfiles=discovered_secondaryfiles,
744 cache=tool_dep_cache)
745 document_loader.idx[deptool["id"]] = deptool
747 for k,v in pm.items():
748 toolmap[k] = v.resolved
749 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
751 tool.visit(upload_tool_deps)
755 def arvados_jobs_image(arvrunner, img, runtimeContext):
756 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
759 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
761 runtimeContext.project_uuid,
762 runtimeContext.force_docker_pull,
763 runtimeContext.tmp_outdir_prefix,
764 runtimeContext.match_local_docker,
765 runtimeContext.copy_deps)
766 except Exception as e:
767 raise Exception("Docker image %s is not available\n%s" % (img, e) )
770 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
771 collection = arvados.collection.Collection(api_client=arvrunner.api,
772 keep_client=arvrunner.keep_client,
773 num_retries=arvrunner.num_retries)
774 with collection.open("workflow.cwl", "w") as f:
775 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
777 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
778 ["name", "like", name+"%"]]
779 if runtimeContext.project_uuid:
780 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
781 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
784 logger.info("Using collection %s", exists["items"][0]["uuid"])
786 collection.save_new(name=name,
787 owner_uuid=runtimeContext.project_uuid,
788 ensure_unique_name=True,
789 num_retries=arvrunner.num_retries)
790 logger.info("Uploaded to %s", collection.manifest_locator())
792 return collection.portable_data_hash()
795 class Runner(Process):
796 """Base class for runner processes, which submit an instance of
797 arvados-cwl-runner and wait for the final result."""
799 def __init__(self, runner, updated_tool,
800 tool, loadingContext, enable_reuse,
801 output_name, output_tags, submit_runner_ram=0,
802 name=None, on_error=None, submit_runner_image=None,
803 intermediate_output_ttl=0, merged_map=None,
804 priority=None, secret_store=None,
805 collection_cache_size=256,
806 collection_cache_is_default=True,
809 loadingContext = loadingContext.copy()
810 loadingContext.metadata = updated_tool.metadata.copy()
812 super(Runner, self).__init__(updated_tool.tool, loadingContext)
814 self.arvrunner = runner
815 self.embedded_tool = tool
816 self.job_order = None
819 # If reuse is permitted by command line arguments but
820 # disabled by the workflow itself, disable it.
821 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
823 enable_reuse = reuse_req["enableReuse"]
824 self.enable_reuse = enable_reuse
826 self.final_output = None
827 self.output_name = output_name
828 self.output_tags = output_tags
830 self.on_error = on_error
831 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
832 self.intermediate_output_ttl = intermediate_output_ttl
833 self.priority = priority
834 self.secret_store = secret_store
835 self.enable_dev = loadingContext.enable_dev
836 self.git_info = git_info
838 self.submit_runner_cores = 1
839 self.submit_runner_ram = 1024 # defaut 1 GiB
840 self.collection_cache_size = collection_cache_size
842 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
843 if runner_resource_req:
844 if runner_resource_req.get("coresMin"):
845 self.submit_runner_cores = runner_resource_req["coresMin"]
846 if runner_resource_req.get("ramMin"):
847 self.submit_runner_ram = runner_resource_req["ramMin"]
848 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
849 self.collection_cache_size = runner_resource_req["keep_cache"]
851 if submit_runner_ram:
852 # Command line / initializer overrides default and/or spec from workflow
853 self.submit_runner_ram = submit_runner_ram
855 if self.submit_runner_ram <= 0:
856 raise Exception("Value of submit-runner-ram must be greater than zero")
858 if self.submit_runner_cores <= 0:
859 raise Exception("Value of submit-runner-cores must be greater than zero")
861 self.merged_map = merged_map or {}
864 job_order, # type: Mapping[Text, Text]
865 output_callbacks, # type: Callable[[Any, Any], Any]
866 runtimeContext # type: RuntimeContext
867 ): # type: (...) -> Generator[Any, None, None]
868 self.job_order = job_order
869 self._init_job(job_order, runtimeContext)
872 def update_pipeline_component(self, record):
875 def done(self, record):
876 """Base method for handling a completed runner."""
879 if record["state"] == "Complete":
880 if record.get("exit_code") is not None:
881 if record["exit_code"] == 33:
882 processStatus = "UnsupportedRequirement"
883 elif record["exit_code"] == 0:
884 processStatus = "success"
886 processStatus = "permanentFail"
888 processStatus = "success"
890 processStatus = "permanentFail"
894 if processStatus == "permanentFail":
895 logc = arvados.collection.CollectionReader(record["log"],
896 api_client=self.arvrunner.api,
897 keep_client=self.arvrunner.keep_client,
898 num_retries=self.arvrunner.num_retries)
899 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
901 self.final_output = record["output"]
902 outc = arvados.collection.CollectionReader(self.final_output,
903 api_client=self.arvrunner.api,
904 keep_client=self.arvrunner.keep_client,
905 num_retries=self.arvrunner.num_retries)
906 if "cwl.output.json" in outc:
907 with outc.open("cwl.output.json", "rb") as f:
909 outputs = json.loads(f.read().decode())
910 def keepify(fileobj):
911 path = fileobj["location"]
912 if not path.startswith("keep:"):
913 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
914 adjustFileObjs(outputs, keepify)
915 adjustDirObjs(outputs, keepify)
917 logger.exception("[%s] While getting final output object", self.name)
918 self.arvrunner.output_callback({}, "permanentFail")
920 self.arvrunner.output_callback(outputs, processStatus)