1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
53 logger = logging.getLogger('arvados.cwl-runner')
55 def trim_anonymous_location(obj):
56 """Remove 'location' field from File and Directory literals.
58 To make internal handling easier, literals are assigned a random id for
59 'location'. However, when writing the record back out, this can break
60 reproducibility. Since it is valid for literals not have a 'location'
65 if obj.get("location", "").startswith("_:"):
69 def remove_redundant_fields(obj):
70 for field in ("path", "nameext", "nameroot", "dirname"):
75 def find_defaults(d, op):
76 if isinstance(d, list):
79 elif isinstance(d, dict):
83 for i in viewvalues(d):
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
89 files=[], # type: List[Dict[Text, Text]]
90 bindings=[], # type: List[Dict[Text, Any]]
91 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
92 names=None, # type: Names
93 requirements=requirements, # type: List[Dict[Text, Any]]
94 hints=hints, # type: List[Dict[Text, Any]]
95 resources={}, # type: Dict[str, int]
96 mutation_manager=None, # type: Optional[MutationManager]
97 formatgraph=None, # type: Optional[Graph]
98 make_fs_access=None, # type: Type[StdFsAccess]
99 fs_access=None, # type: StdFsAccess
100 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101 timeout=runtimeContext.eval_timeout, # type: float
102 debug=runtimeContext.debug, # type: bool
103 js_console=runtimeContext.js_console, # type: bool
104 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
105 loadListing="", # type: Text
106 outdir="", # type: Text
107 tmpdir="", # type: Text
108 stagedir="", # type: Text
109 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
110 container_engine="docker"
113 def search_schemadef(name, reqs):
115 if r["class"] == "SchemaDefRequirement":
116 for sd in r["types"]:
117 if sd["name"] == name:
121 primitive_types_set = frozenset(("null", "boolean", "int", "long",
122 "float", "double", "string", "record",
125 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
126 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
127 # union type, collect all possible secondaryFiles
128 for i in inputschema:
129 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
132 if isinstance(inputschema, basestring):
133 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
139 if "secondaryFiles" in inputschema:
140 # set secondaryFiles, may be inherited by compound types.
141 secondaryspec = inputschema["secondaryFiles"]
143 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
144 not isinstance(inputschema["type"], basestring)):
145 # compound type (union, array, record)
146 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
148 elif (inputschema["type"] == "record" and
149 isinstance(primary, Mapping)):
151 # record type, find secondary files associated with fields.
153 for f in inputschema["fields"]:
154 p = primary.get(shortname(f["name"]))
156 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
158 elif (inputschema["type"] == "array" and
159 isinstance(primary, Sequence)):
161 # array type, find secondary files of elements
164 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
166 elif (inputschema["type"] == "File" and
168 isinstance(primary, Mapping) and
169 primary.get("class") == "File" and
170 "secondaryFiles" not in primary):
172 # Found a file, check for secondaryFiles
175 primary["secondaryFiles"] = secondaryspec
176 for i, sf in enumerate(aslist(secondaryspec)):
177 if builder.cwlVersion == "v1.0":
178 pattern = builder.do_eval(sf, context=primary)
180 pattern = builder.do_eval(sf["pattern"], context=primary)
183 if isinstance(pattern, list):
184 specs.extend(pattern)
185 elif isinstance(pattern, dict):
186 specs.append(pattern)
187 elif isinstance(pattern, str):
188 if builder.cwlVersion == "v1.0":
189 specs.append({"pattern": pattern, "required": True})
191 specs.append({"pattern": pattern, "required": sf.get("required")})
193 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
194 "Expression must return list, object, string or null")
197 for i, sf in enumerate(specs):
198 if isinstance(sf, dict):
199 if sf.get("class") == "File":
201 if sf.get("location") is None:
202 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
203 "File object is missing 'location': %s" % sf)
204 sfpath = sf["location"]
207 pattern = sf["pattern"]
208 required = sf.get("required")
209 elif isinstance(sf, str):
213 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214 "Expression must return list, object, string or null")
216 if pattern is not None:
217 sfpath = substitute(primary["location"], pattern)
219 required = builder.do_eval(required, context=primary)
221 if fsaccess.exists(sfpath):
222 if pattern is not None:
223 found.append({"location": sfpath, "class": "File"})
227 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
228 "Required secondary file '%s' does not exist" % sfpath)
230 primary["secondaryFiles"] = cmap(found)
231 if discovered is not None:
232 discovered[primary["location"]] = primary["secondaryFiles"]
233 elif inputschema["type"] not in primitive_types_set:
234 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
236 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
237 for inputschema in inputs:
238 primary = job_order.get(shortname(inputschema["id"]))
239 if isinstance(primary, (Mapping, Sequence)):
240 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
242 def upload_dependencies(arvrunner, name, document_loader,
243 workflowobj, uri, loadref_run,
244 include_primary=True, discovered_secondaryfiles=None):
245 """Upload the dependencies of the workflowobj document to Keep.
247 Returns a pathmapper object mapping local paths to keep references. Also
248 does an in-place update of references in "workflowobj".
250 Use scandeps to find $import, $include, $schemas, run, File and Directory
251 fields that represent external references.
253 If workflowobj has an "id" field, this will reload the document to ensure
254 it is scanning the raw document prior to preprocessing.
259 joined = document_loader.fetcher.urljoin(b, u)
260 defrg, _ = urllib.parse.urldefrag(joined)
261 if defrg not in loaded:
263 # Use fetch_text to get raw file (before preprocessing).
264 text = document_loader.fetch_text(defrg)
265 if isinstance(text, bytes):
266 textIO = StringIO(text.decode('utf-8'))
268 textIO = StringIO(text)
269 yamlloader = YAML(typ='safe', pure=True)
270 return yamlloader.load(textIO)
275 loadref_fields = set(("$import", "run"))
277 loadref_fields = set(("$import",))
279 scanobj = workflowobj
280 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
281 # Need raw file content (before preprocessing) to ensure
282 # that external references in $include and $mixin are captured.
283 scanobj = loadref("", workflowobj["id"])
287 sc_result = scandeps(uri, scanobj,
289 set(("$include", "location")),
290 loadref, urljoin=document_loader.fetcher.urljoin,
293 optional_deps = scandeps(uri, scanobj,
296 loadref, urljoin=document_loader.fetcher.urljoin,
299 sc_result.extend(optional_deps)
304 def collect_uuids(obj):
305 loc = obj.get("location", "")
308 # Collect collection uuids that need to be resolved to
309 # portable data hashes
310 gp = collection_uuid_pattern.match(loc)
312 uuids[gp.groups()[0]] = obj
313 if collectionUUID in obj:
314 uuids[obj[collectionUUID]] = obj
316 def collect_uploads(obj):
317 loc = obj.get("location", "")
321 if sp[0] in ("file", "http", "https"):
322 # Record local files than need to be uploaded,
323 # don't include file literals, keep references, etc.
327 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
328 visit_class(sc_result, ("File", "Directory"), collect_uploads)
330 # Resolve any collection uuids we found to portable data hashes
331 # and assign them to uuid_map
333 fetch_uuids = list(uuids.keys())
335 # For a large number of fetch_uuids, API server may limit
336 # response size, so keep fetching from API server has nothing
338 lookups = arvrunner.api.collections().list(
339 filters=[["uuid", "in", fetch_uuids]],
341 select=["uuid", "portable_data_hash"]).execute(
342 num_retries=arvrunner.num_retries)
344 if not lookups["items"]:
347 for l in lookups["items"]:
348 uuid_map[l["uuid"]] = l["portable_data_hash"]
350 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
352 normalizeFilesDirs(sc)
354 if include_primary and "id" in workflowobj:
355 sc.append({"class": "File", "location": workflowobj["id"]})
357 def visit_default(obj):
358 def defaults_are_optional(f):
359 if "location" not in f and "path" in f:
360 f["location"] = f["path"]
362 normalizeFilesDirs(f)
363 optional_deps.append(f)
364 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
366 find_defaults(workflowobj, visit_default)
369 def discover_default_secondary_files(obj):
370 builder_job_order = {}
371 for t in obj["inputs"]:
372 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
373 # Need to create a builder object to evaluate expressions.
374 builder = make_builder(builder_job_order,
375 obj.get("hints", []),
376 obj.get("requirements", []),
379 discover_secondary_files(arvrunner.fs_access,
385 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
386 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
388 for d in list(discovered):
389 # Only interested in discovered secondaryFiles which are local
390 # files that need to be uploaded.
391 if d.startswith("file:"):
392 sc.extend(discovered[d])
396 mapper = ArvPathMapper(arvrunner, sc, "",
400 single_collection=True,
401 optional_deps=optional_deps)
405 if k.startswith("keep:"):
406 keeprefs.add(collection_pdh_pattern.match(k).group(1))
409 loc = p.get("location")
410 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
411 p["location"] = mapper.mapper(p["location"]).resolved
412 addkeepref(p["location"])
418 if collectionUUID in p:
419 uuid = p[collectionUUID]
420 if uuid not in uuid_map:
421 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
422 "Collection uuid %s not found" % uuid)
423 gp = collection_pdh_pattern.match(loc)
424 if gp and uuid_map[uuid] != gp.groups()[0]:
425 # This file entry has both collectionUUID and a PDH
426 # location. If the PDH doesn't match the one returned
427 # the API server, raise an error.
428 raise SourceLine(p, "location", validate.ValidationException).makeError(
429 "Expected collection uuid %s to be %s but API server reported %s" % (
430 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
432 gp = collection_uuid_pattern.match(loc)
434 # Not a uuid pattern (must be a pdh pattern)
435 addkeepref(p["location"])
438 uuid = gp.groups()[0]
439 if uuid not in uuid_map:
440 raise SourceLine(p, "location", validate.ValidationException).makeError(
441 "Collection uuid %s not found" % uuid)
442 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
443 p[collectionUUID] = uuid
445 visit_class(workflowobj, ("File", "Directory"), setloc)
446 visit_class(discovered, ("File", "Directory"), setloc)
448 if discovered_secondaryfiles is not None:
450 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
452 if arvrunner.runtimeContext.copy_deps:
453 # Find referenced collections and copy them into the
454 # destination project, for easy sharing.
455 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
456 filters=[["portable_data_hash", "in", list(keeprefs)],
457 ["owner_uuid", "=", arvrunner.project_uuid]],
458 select=["uuid", "portable_data_hash", "created_at"]))
460 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
462 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
463 order="created_at desc",
464 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
466 if len(col["items"]) == 0:
467 logger.warning("Cannot find collection with portable data hash %s", kr)
469 col = col["items"][0]
471 arvrunner.api.collections().create(body={"collection": {
472 "owner_uuid": arvrunner.project_uuid,
474 "description": col["description"],
475 "properties": col["properties"],
476 "portable_data_hash": col["portable_data_hash"],
477 "manifest_text": col["manifest_text"],
478 "storage_classes_desired": col["storage_classes_desired"],
479 "trash_at": col["trash_at"]
480 }}, ensure_unique_name=True).execute()
481 except Exception as e:
482 logger.warning("Unable copy collection to destination: %s", e)
484 if "$schemas" in workflowobj:
486 for s in workflowobj["$schemas"]:
488 sch.append(mapper.mapper(s).resolved)
489 workflowobj["$schemas"] = sch
494 def upload_docker(arvrunner, tool):
495 """Uploads Docker images used in CommandLineTool objects."""
497 if isinstance(tool, CommandLineTool):
498 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
500 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
501 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
502 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
504 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
505 arvrunner.runtimeContext.force_docker_pull,
506 arvrunner.runtimeContext.tmp_outdir_prefix,
507 arvrunner.runtimeContext.match_local_docker,
508 arvrunner.runtimeContext.copy_deps)
510 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
511 True, arvrunner.project_uuid,
512 arvrunner.runtimeContext.force_docker_pull,
513 arvrunner.runtimeContext.tmp_outdir_prefix,
514 arvrunner.runtimeContext.match_local_docker,
515 arvrunner.runtimeContext.copy_deps)
516 elif isinstance(tool, cwltool.workflow.Workflow):
518 upload_docker(arvrunner, s.embedded_tool)
521 def packed_workflow(arvrunner, tool, merged_map):
522 """Create a packed workflow.
524 A "packed" workflow is one where all the components have been combined into a single document."""
527 packed = pack(arvrunner.loadingContext, tool.tool["id"],
528 rewrite_out=rewrites,
529 loader=tool.doc_loader)
531 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
533 def visit(v, cur_id):
534 if isinstance(v, dict):
535 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
536 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
537 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
539 cur_id = rewrite_to_orig.get(v["id"], v["id"])
540 if "path" in v and "location" not in v:
541 v["location"] = v["path"]
543 if "location" in v and cur_id in merged_map:
544 if v["location"] in merged_map[cur_id].resolved:
545 v["location"] = merged_map[cur_id].resolved[v["location"]]
546 if v["location"] in merged_map[cur_id].secondaryFiles:
547 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
548 if v.get("class") == "DockerRequirement":
549 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
550 arvrunner.project_uuid,
551 arvrunner.runtimeContext.force_docker_pull,
552 arvrunner.runtimeContext.tmp_outdir_prefix,
553 arvrunner.runtimeContext.match_local_docker,
554 arvrunner.runtimeContext.copy_deps)
557 if isinstance(v, list):
564 def tag_git_version(packed):
565 if tool.tool["id"].startswith("file://"):
566 path = os.path.dirname(tool.tool["id"][7:])
568 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
569 except (OSError, subprocess.CalledProcessError):
572 packed["http://schema.org/version"] = githash
575 def upload_job_order(arvrunner, name, tool, job_order):
576 """Upload local files referenced in the input object and return updated input
577 object with 'location' updated to the proper keep references.
580 # Make a copy of the job order and set defaults.
581 builder_job_order = copy.copy(job_order)
583 # fill_in_defaults throws an error if there are any
584 # missing required parameters, we don't want it to do that
585 # so make them all optional.
586 inputs_copy = copy.deepcopy(tool.tool["inputs"])
587 for i in inputs_copy:
588 if "null" not in i["type"]:
589 i["type"] = ["null"] + aslist(i["type"])
591 fill_in_defaults(inputs_copy,
594 # Need to create a builder object to evaluate expressions.
595 builder = make_builder(builder_job_order,
600 # Now update job_order with secondaryFiles
601 discover_secondary_files(arvrunner.fs_access,
606 jobmapper = upload_dependencies(arvrunner,
610 job_order.get("id", "#"),
613 if "id" in job_order:
616 # Need to filter this out, gets added by cwltool when providing
617 # parameters on the command line.
618 if "job_order" in job_order:
619 del job_order["job_order"]
623 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
625 def upload_workflow_deps(arvrunner, tool):
626 # Ensure that Docker images needed by this workflow are available
628 upload_docker(arvrunner, tool)
630 document_loader = tool.doc_loader
634 def upload_tool_deps(deptool):
636 discovered_secondaryfiles = {}
637 pm = upload_dependencies(arvrunner,
638 "%s dependencies" % (shortname(deptool["id"])),
643 include_primary=False,
644 discovered_secondaryfiles=discovered_secondaryfiles)
645 document_loader.idx[deptool["id"]] = deptool
647 for k,v in pm.items():
648 toolmap[k] = v.resolved
649 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
651 tool.visit(upload_tool_deps)
655 def arvados_jobs_image(arvrunner, img):
656 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
659 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
660 arvrunner.runtimeContext.force_docker_pull,
661 arvrunner.runtimeContext.tmp_outdir_prefix,
662 arvrunner.runtimeContext.match_local_docker,
663 arvrunner.runtimeContext.copy_deps)
664 except Exception as e:
665 raise Exception("Docker image %s is not available\n%s" % (img, e) )
668 def upload_workflow_collection(arvrunner, name, packed):
669 collection = arvados.collection.Collection(api_client=arvrunner.api,
670 keep_client=arvrunner.keep_client,
671 num_retries=arvrunner.num_retries)
672 with collection.open("workflow.cwl", "w") as f:
673 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
675 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
676 ["name", "like", name+"%"]]
677 if arvrunner.project_uuid:
678 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
679 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
682 logger.info("Using collection %s", exists["items"][0]["uuid"])
684 collection.save_new(name=name,
685 owner_uuid=arvrunner.project_uuid,
686 ensure_unique_name=True,
687 num_retries=arvrunner.num_retries)
688 logger.info("Uploaded to %s", collection.manifest_locator())
690 return collection.portable_data_hash()
693 class Runner(Process):
694 """Base class for runner processes, which submit an instance of
695 arvados-cwl-runner and wait for the final result."""
697 def __init__(self, runner, updated_tool,
698 tool, loadingContext, enable_reuse,
699 output_name, output_tags, submit_runner_ram=0,
700 name=None, on_error=None, submit_runner_image=None,
701 intermediate_output_ttl=0, merged_map=None,
702 priority=None, secret_store=None,
703 collection_cache_size=256,
704 collection_cache_is_default=True):
706 loadingContext = loadingContext.copy()
707 loadingContext.metadata = updated_tool.metadata.copy()
709 super(Runner, self).__init__(updated_tool.tool, loadingContext)
711 self.arvrunner = runner
712 self.embedded_tool = tool
713 self.job_order = None
716 # If reuse is permitted by command line arguments but
717 # disabled by the workflow itself, disable it.
718 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
720 enable_reuse = reuse_req["enableReuse"]
721 self.enable_reuse = enable_reuse
723 self.final_output = None
724 self.output_name = output_name
725 self.output_tags = output_tags
727 self.on_error = on_error
728 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
729 self.intermediate_output_ttl = intermediate_output_ttl
730 self.priority = priority
731 self.secret_store = secret_store
732 self.enable_dev = loadingContext.enable_dev
734 self.submit_runner_cores = 1
735 self.submit_runner_ram = 1024 # defaut 1 GiB
736 self.collection_cache_size = collection_cache_size
738 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
739 if runner_resource_req:
740 if runner_resource_req.get("coresMin"):
741 self.submit_runner_cores = runner_resource_req["coresMin"]
742 if runner_resource_req.get("ramMin"):
743 self.submit_runner_ram = runner_resource_req["ramMin"]
744 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
745 self.collection_cache_size = runner_resource_req["keep_cache"]
747 if submit_runner_ram:
748 # Command line / initializer overrides default and/or spec from workflow
749 self.submit_runner_ram = submit_runner_ram
751 if self.submit_runner_ram <= 0:
752 raise Exception("Value of submit-runner-ram must be greater than zero")
754 if self.submit_runner_cores <= 0:
755 raise Exception("Value of submit-runner-cores must be greater than zero")
757 self.merged_map = merged_map or {}
760 job_order, # type: Mapping[Text, Text]
761 output_callbacks, # type: Callable[[Any, Any], Any]
762 runtimeContext # type: RuntimeContext
763 ): # type: (...) -> Generator[Any, None, None]
764 self.job_order = job_order
765 self._init_job(job_order, runtimeContext)
768 def update_pipeline_component(self, record):
771 def done(self, record):
772 """Base method for handling a completed runner."""
775 if record["state"] == "Complete":
776 if record.get("exit_code") is not None:
777 if record["exit_code"] == 33:
778 processStatus = "UnsupportedRequirement"
779 elif record["exit_code"] == 0:
780 processStatus = "success"
782 processStatus = "permanentFail"
784 processStatus = "success"
786 processStatus = "permanentFail"
790 if processStatus == "permanentFail":
791 logc = arvados.collection.CollectionReader(record["log"],
792 api_client=self.arvrunner.api,
793 keep_client=self.arvrunner.keep_client,
794 num_retries=self.arvrunner.num_retries)
795 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
797 self.final_output = record["output"]
798 outc = arvados.collection.CollectionReader(self.final_output,
799 api_client=self.arvrunner.api,
800 keep_client=self.arvrunner.keep_client,
801 num_retries=self.arvrunner.num_retries)
802 if "cwl.output.json" in outc:
803 with outc.open("cwl.output.json", "rb") as f:
805 outputs = json.loads(f.read().decode())
806 def keepify(fileobj):
807 path = fileobj["location"]
808 if not path.startswith("keep:"):
809 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
810 adjustFileObjs(outputs, keepify)
811 adjustDirObjs(outputs, keepify)
813 logger.exception("[%s] While getting final output object", self.name)
814 self.arvrunner.output_callback({}, "permanentFail")
816 self.arvrunner.output_callback(outputs, processStatus)