1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
53 logger = logging.getLogger('arvados.cwl-runner')
55 def trim_anonymous_location(obj):
56 """Remove 'location' field from File and Directory literals.
58 To make internal handling easier, literals are assigned a random id for
59 'location'. However, when writing the record back out, this can break
60 reproducibility. Since it is valid for literals not have a 'location'
65 if obj.get("location", "").startswith("_:"):
69 def remove_redundant_fields(obj):
70 for field in ("path", "nameext", "nameroot", "dirname"):
75 def find_defaults(d, op):
76 if isinstance(d, list):
79 elif isinstance(d, dict):
83 for i in viewvalues(d):
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
89 files=[], # type: List[Dict[Text, Text]]
90 bindings=[], # type: List[Dict[Text, Any]]
91 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
92 names=None, # type: Names
93 requirements=requirements, # type: List[Dict[Text, Any]]
94 hints=hints, # type: List[Dict[Text, Any]]
95 resources={}, # type: Dict[str, int]
96 mutation_manager=None, # type: Optional[MutationManager]
97 formatgraph=None, # type: Optional[Graph]
98 make_fs_access=None, # type: Type[StdFsAccess]
99 fs_access=None, # type: StdFsAccess
100 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101 timeout=runtimeContext.eval_timeout, # type: float
102 debug=runtimeContext.debug, # type: bool
103 js_console=runtimeContext.js_console, # type: bool
104 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
105 loadListing="", # type: Text
106 outdir="", # type: Text
107 tmpdir="", # type: Text
108 stagedir="", # type: Text
109 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
110 container_engine="docker"
113 def search_schemadef(name, reqs):
115 if r["class"] == "SchemaDefRequirement":
116 for sd in r["types"]:
117 if sd["name"] == name:
121 primitive_types_set = frozenset(("null", "boolean", "int", "long",
122 "float", "double", "string", "record",
125 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
126 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
127 # union type, collect all possible secondaryFiles
128 for i in inputschema:
129 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
132 if isinstance(inputschema, basestring):
133 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
139 if "secondaryFiles" in inputschema:
140 # set secondaryFiles, may be inherited by compound types.
141 secondaryspec = inputschema["secondaryFiles"]
143 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
144 not isinstance(inputschema["type"], basestring)):
145 # compound type (union, array, record)
146 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
148 elif (inputschema["type"] == "record" and
149 isinstance(primary, Mapping)):
151 # record type, find secondary files associated with fields.
153 for f in inputschema["fields"]:
154 p = primary.get(shortname(f["name"]))
156 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
158 elif (inputschema["type"] == "array" and
159 isinstance(primary, Sequence)):
161 # array type, find secondary files of elements
164 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
166 elif (inputschema["type"] == "File" and
168 isinstance(primary, Mapping) and
169 primary.get("class") == "File" and
170 "secondaryFiles" not in primary):
172 # Found a file, check for secondaryFiles
175 primary["secondaryFiles"] = secondaryspec
176 for i, sf in enumerate(aslist(secondaryspec)):
177 if builder.cwlVersion == "v1.0":
178 pattern = builder.do_eval(sf, context=primary)
180 pattern = builder.do_eval(sf["pattern"], context=primary)
183 if isinstance(pattern, list):
184 specs.extend(pattern)
185 elif isinstance(pattern, dict):
186 specs.append(pattern)
187 elif isinstance(pattern, str):
188 if builder.cwlVersion == "v1.0":
189 specs.append({"pattern": pattern, "required": True})
191 specs.append({"pattern": pattern, "required": sf.get("required")})
193 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
194 "Expression must return list, object, string or null")
197 for i, sf in enumerate(specs):
198 if isinstance(sf, dict):
199 if sf.get("class") == "File":
201 if sf.get("location") is None:
202 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
203 "File object is missing 'location': %s" % sf)
204 sfpath = sf["location"]
207 pattern = sf["pattern"]
208 required = sf.get("required")
209 elif isinstance(sf, str):
213 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214 "Expression must return list, object, string or null")
216 if pattern is not None:
217 sfpath = substitute(primary["location"], pattern)
219 required = builder.do_eval(required, context=primary)
221 if fsaccess.exists(sfpath):
222 if pattern is not None:
223 found.append({"location": sfpath, "class": "File"})
227 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
228 "Required secondary file '%s' does not exist" % sfpath)
230 primary["secondaryFiles"] = cmap(found)
231 if discovered is not None:
232 discovered[primary["location"]] = primary["secondaryFiles"]
233 elif inputschema["type"] not in primitive_types_set:
234 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
236 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
237 for inputschema in inputs:
238 primary = job_order.get(shortname(inputschema["id"]))
239 if isinstance(primary, (Mapping, Sequence)):
240 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
242 def upload_dependencies(arvrunner, name, document_loader,
243 workflowobj, uri, loadref_run, runtimeContext,
244 include_primary=True, discovered_secondaryfiles=None):
245 """Upload the dependencies of the workflowobj document to Keep.
247 Returns a pathmapper object mapping local paths to keep references. Also
248 does an in-place update of references in "workflowobj".
250 Use scandeps to find $import, $include, $schemas, run, File and Directory
251 fields that represent external references.
253 If workflowobj has an "id" field, this will reload the document to ensure
254 it is scanning the raw document prior to preprocessing.
259 joined = document_loader.fetcher.urljoin(b, u)
260 defrg, _ = urllib.parse.urldefrag(joined)
261 if defrg not in loaded:
263 # Use fetch_text to get raw file (before preprocessing).
264 text = document_loader.fetch_text(defrg)
265 if isinstance(text, bytes):
266 textIO = StringIO(text.decode('utf-8'))
268 textIO = StringIO(text)
269 yamlloader = YAML(typ='safe', pure=True)
270 return yamlloader.load(textIO)
275 loadref_fields = set(("$import", "run"))
277 loadref_fields = set(("$import",))
279 scanobj = workflowobj
280 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
281 # Need raw file content (before preprocessing) to ensure
282 # that external references in $include and $mixin are captured.
283 scanobj = loadref("", workflowobj["id"])
287 sc_result = scandeps(uri, scanobj,
289 set(("$include", "location")),
290 loadref, urljoin=document_loader.fetcher.urljoin,
293 optional_deps = scandeps(uri, scanobj,
296 loadref, urljoin=document_loader.fetcher.urljoin,
299 sc_result.extend(optional_deps)
304 def collect_uuids(obj):
305 loc = obj.get("location", "")
308 # Collect collection uuids that need to be resolved to
309 # portable data hashes
310 gp = collection_uuid_pattern.match(loc)
312 uuids[gp.groups()[0]] = obj
313 if collectionUUID in obj:
314 uuids[obj[collectionUUID]] = obj
316 def collect_uploads(obj):
317 loc = obj.get("location", "")
321 if sp[0] in ("file", "http", "https"):
322 # Record local files than need to be uploaded,
323 # don't include file literals, keep references, etc.
327 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
328 visit_class(sc_result, ("File", "Directory"), collect_uploads)
330 # Resolve any collection uuids we found to portable data hashes
331 # and assign them to uuid_map
333 fetch_uuids = list(uuids.keys())
335 # For a large number of fetch_uuids, API server may limit
336 # response size, so keep fetching from API server has nothing
338 lookups = arvrunner.api.collections().list(
339 filters=[["uuid", "in", fetch_uuids]],
341 select=["uuid", "portable_data_hash"]).execute(
342 num_retries=arvrunner.num_retries)
344 if not lookups["items"]:
347 for l in lookups["items"]:
348 uuid_map[l["uuid"]] = l["portable_data_hash"]
350 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
352 normalizeFilesDirs(sc)
354 if include_primary and "id" in workflowobj:
355 sc.append({"class": "File", "location": workflowobj["id"]})
357 def visit_default(obj):
358 def defaults_are_optional(f):
359 if "location" not in f and "path" in f:
360 f["location"] = f["path"]
362 normalizeFilesDirs(f)
363 optional_deps.append(f)
364 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
366 find_defaults(workflowobj, visit_default)
369 def discover_default_secondary_files(obj):
370 builder_job_order = {}
371 for t in obj["inputs"]:
372 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
373 # Need to create a builder object to evaluate expressions.
374 builder = make_builder(builder_job_order,
375 obj.get("hints", []),
376 obj.get("requirements", []),
379 discover_secondary_files(arvrunner.fs_access,
385 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
386 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
388 for d in list(discovered):
389 # Only interested in discovered secondaryFiles which are local
390 # files that need to be uploaded.
391 if d.startswith("file:"):
392 sc.extend(discovered[d])
396 mapper = ArvPathMapper(arvrunner, sc, "",
400 single_collection=True,
401 optional_deps=optional_deps)
405 if k.startswith("keep:"):
406 keeprefs.add(collection_pdh_pattern.match(k).group(1))
409 loc = p.get("location")
410 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
411 p["location"] = mapper.mapper(p["location"]).resolved
412 addkeepref(p["location"])
418 if collectionUUID in p:
419 uuid = p[collectionUUID]
420 if uuid not in uuid_map:
421 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
422 "Collection uuid %s not found" % uuid)
423 gp = collection_pdh_pattern.match(loc)
424 if gp and uuid_map[uuid] != gp.groups()[0]:
425 # This file entry has both collectionUUID and a PDH
426 # location. If the PDH doesn't match the one returned
427 # the API server, raise an error.
428 raise SourceLine(p, "location", validate.ValidationException).makeError(
429 "Expected collection uuid %s to be %s but API server reported %s" % (
430 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
432 gp = collection_uuid_pattern.match(loc)
434 # Not a uuid pattern (must be a pdh pattern)
435 addkeepref(p["location"])
438 uuid = gp.groups()[0]
439 if uuid not in uuid_map:
440 raise SourceLine(p, "location", validate.ValidationException).makeError(
441 "Collection uuid %s not found" % uuid)
442 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
443 p[collectionUUID] = uuid
445 visit_class(workflowobj, ("File", "Directory"), setloc)
446 visit_class(discovered, ("File", "Directory"), setloc)
448 if discovered_secondaryfiles is not None:
450 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
452 if runtimeContext.copy_deps:
453 # Find referenced collections and copy them into the
454 # destination project, for easy sharing.
455 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
456 filters=[["portable_data_hash", "in", list(keeprefs)],
457 ["owner_uuid", "=", runtimeContext.project_uuid]],
458 select=["uuid", "portable_data_hash", "created_at"]))
460 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
462 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
463 order="created_at desc",
464 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
466 if len(col["items"]) == 0:
467 logger.warning("Cannot find collection with portable data hash %s", kr)
469 col = col["items"][0]
471 arvrunner.api.collections().create(body={"collection": {
472 "owner_uuid": runtimeContext.project_uuid,
474 "description": col["description"],
475 "properties": col["properties"],
476 "portable_data_hash": col["portable_data_hash"],
477 "manifest_text": col["manifest_text"],
478 "storage_classes_desired": col["storage_classes_desired"],
479 "trash_at": col["trash_at"]
480 }}, ensure_unique_name=True).execute()
481 except Exception as e:
482 logger.warning("Unable copy collection to destination: %s", e)
484 if "$schemas" in workflowobj:
486 for s in workflowobj["$schemas"]:
488 sch.append(mapper.mapper(s).resolved)
489 workflowobj["$schemas"] = sch
494 def upload_docker(arvrunner, tool, runtimeContext):
495 """Uploads Docker images used in CommandLineTool objects."""
497 if isinstance(tool, CommandLineTool):
498 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
500 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
501 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
502 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
504 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
505 runtimeContext.project_uuid,
506 runtimeContext.force_docker_pull,
507 runtimeContext.tmp_outdir_prefix,
508 runtimeContext.match_local_docker,
509 runtimeContext.copy_deps)
511 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
513 runtimeContext.project_uuid,
514 runtimeContext.force_docker_pull,
515 runtimeContext.tmp_outdir_prefix,
516 runtimeContext.match_local_docker,
517 runtimeContext.copy_deps)
518 elif isinstance(tool, cwltool.workflow.Workflow):
520 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
523 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
524 """Create a packed workflow.
526 A "packed" workflow is one where all the components have been combined into a single document."""
529 packed = pack(arvrunner.loadingContext, tool.tool["id"],
530 rewrite_out=rewrites,
531 loader=tool.doc_loader)
533 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
535 def visit(v, cur_id):
536 if isinstance(v, dict):
537 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
538 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
539 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
541 cur_id = rewrite_to_orig.get(v["id"], v["id"])
542 if "path" in v and "location" not in v:
543 v["location"] = v["path"]
545 if "location" in v and cur_id in merged_map:
546 if v["location"] in merged_map[cur_id].resolved:
547 v["location"] = merged_map[cur_id].resolved[v["location"]]
548 if v["location"] in merged_map[cur_id].secondaryFiles:
549 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
550 if v.get("class") == "DockerRequirement":
551 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
552 runtimeContext.project_uuid,
553 runtimeContext.force_docker_pull,
554 runtimeContext.tmp_outdir_prefix,
555 runtimeContext.match_local_docker,
556 runtimeContext.copy_deps)
559 if isinstance(v, list):
566 def tag_git_version(packed):
567 if tool.tool["id"].startswith("file://"):
568 path = os.path.dirname(tool.tool["id"][7:])
570 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
571 except (OSError, subprocess.CalledProcessError):
574 packed["http://schema.org/version"] = githash
577 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
578 """Upload local files referenced in the input object and return updated input
579 object with 'location' updated to the proper keep references.
582 # Make a copy of the job order and set defaults.
583 builder_job_order = copy.copy(job_order)
585 # fill_in_defaults throws an error if there are any
586 # missing required parameters, we don't want it to do that
587 # so make them all optional.
588 inputs_copy = copy.deepcopy(tool.tool["inputs"])
589 for i in inputs_copy:
590 if "null" not in i["type"]:
591 i["type"] = ["null"] + aslist(i["type"])
593 fill_in_defaults(inputs_copy,
596 # Need to create a builder object to evaluate expressions.
597 builder = make_builder(builder_job_order,
602 # Now update job_order with secondaryFiles
603 discover_secondary_files(arvrunner.fs_access,
608 jobmapper = upload_dependencies(arvrunner,
612 job_order.get("id", "#"),
616 if "id" in job_order:
619 # Need to filter this out, gets added by cwltool when providing
620 # parameters on the command line.
621 if "job_order" in job_order:
622 del job_order["job_order"]
626 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
628 def upload_workflow_deps(arvrunner, tool, runtimeContext):
629 # Ensure that Docker images needed by this workflow are available
631 upload_docker(arvrunner, tool, runtimeContext)
633 document_loader = tool.doc_loader
637 def upload_tool_deps(deptool):
639 discovered_secondaryfiles = {}
640 pm = upload_dependencies(arvrunner,
641 "%s dependencies" % (shortname(deptool["id"])),
647 include_primary=False,
648 discovered_secondaryfiles=discovered_secondaryfiles)
649 document_loader.idx[deptool["id"]] = deptool
651 for k,v in pm.items():
652 toolmap[k] = v.resolved
653 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
655 tool.visit(upload_tool_deps)
659 def arvados_jobs_image(arvrunner, img, runtimeContext):
660 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
663 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
665 runtimeContext.project_uuid,
666 runtimeContext.force_docker_pull,
667 runtimeContext.tmp_outdir_prefix,
668 runtimeContext.match_local_docker,
669 runtimeContext.copy_deps)
670 except Exception as e:
671 raise Exception("Docker image %s is not available\n%s" % (img, e) )
674 def upload_workflow_collection(arvrunner, name, packed):
675 collection = arvados.collection.Collection(api_client=arvrunner.api,
676 keep_client=arvrunner.keep_client,
677 num_retries=arvrunner.num_retries)
678 with collection.open("workflow.cwl", "w") as f:
679 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
681 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
682 ["name", "like", name+"%"]]
683 if arvrunner.project_uuid:
684 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
685 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
688 logger.info("Using collection %s", exists["items"][0]["uuid"])
690 collection.save_new(name=name,
691 owner_uuid=arvrunner.project_uuid,
692 ensure_unique_name=True,
693 num_retries=arvrunner.num_retries)
694 logger.info("Uploaded to %s", collection.manifest_locator())
696 return collection.portable_data_hash()
699 class Runner(Process):
700 """Base class for runner processes, which submit an instance of
701 arvados-cwl-runner and wait for the final result."""
703 def __init__(self, runner, updated_tool,
704 tool, loadingContext, enable_reuse,
705 output_name, output_tags, submit_runner_ram=0,
706 name=None, on_error=None, submit_runner_image=None,
707 intermediate_output_ttl=0, merged_map=None,
708 priority=None, secret_store=None,
709 collection_cache_size=256,
710 collection_cache_is_default=True):
712 loadingContext = loadingContext.copy()
713 loadingContext.metadata = updated_tool.metadata.copy()
715 super(Runner, self).__init__(updated_tool.tool, loadingContext)
717 self.arvrunner = runner
718 self.embedded_tool = tool
719 self.job_order = None
722 # If reuse is permitted by command line arguments but
723 # disabled by the workflow itself, disable it.
724 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
726 enable_reuse = reuse_req["enableReuse"]
727 self.enable_reuse = enable_reuse
729 self.final_output = None
730 self.output_name = output_name
731 self.output_tags = output_tags
733 self.on_error = on_error
734 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
735 self.intermediate_output_ttl = intermediate_output_ttl
736 self.priority = priority
737 self.secret_store = secret_store
738 self.enable_dev = loadingContext.enable_dev
740 self.submit_runner_cores = 1
741 self.submit_runner_ram = 1024 # defaut 1 GiB
742 self.collection_cache_size = collection_cache_size
744 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
745 if runner_resource_req:
746 if runner_resource_req.get("coresMin"):
747 self.submit_runner_cores = runner_resource_req["coresMin"]
748 if runner_resource_req.get("ramMin"):
749 self.submit_runner_ram = runner_resource_req["ramMin"]
750 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
751 self.collection_cache_size = runner_resource_req["keep_cache"]
753 if submit_runner_ram:
754 # Command line / initializer overrides default and/or spec from workflow
755 self.submit_runner_ram = submit_runner_ram
757 if self.submit_runner_ram <= 0:
758 raise Exception("Value of submit-runner-ram must be greater than zero")
760 if self.submit_runner_cores <= 0:
761 raise Exception("Value of submit-runner-cores must be greater than zero")
763 self.merged_map = merged_map or {}
766 job_order, # type: Mapping[Text, Text]
767 output_callbacks, # type: Callable[[Any, Any], Any]
768 runtimeContext # type: RuntimeContext
769 ): # type: (...) -> Generator[Any, None, None]
770 self.job_order = job_order
771 self._init_job(job_order, runtimeContext)
774 def update_pipeline_component(self, record):
777 def done(self, record):
778 """Base method for handling a completed runner."""
781 if record["state"] == "Complete":
782 if record.get("exit_code") is not None:
783 if record["exit_code"] == 33:
784 processStatus = "UnsupportedRequirement"
785 elif record["exit_code"] == 0:
786 processStatus = "success"
788 processStatus = "permanentFail"
790 processStatus = "success"
792 processStatus = "permanentFail"
796 if processStatus == "permanentFail":
797 logc = arvados.collection.CollectionReader(record["log"],
798 api_client=self.arvrunner.api,
799 keep_client=self.arvrunner.keep_client,
800 num_retries=self.arvrunner.num_retries)
801 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
803 self.final_output = record["output"]
804 outc = arvados.collection.CollectionReader(self.final_output,
805 api_client=self.arvrunner.api,
806 keep_client=self.arvrunner.keep_client,
807 num_retries=self.arvrunner.num_retries)
808 if "cwl.output.json" in outc:
809 with outc.open("cwl.output.json", "rb") as f:
811 outputs = json.loads(f.read().decode())
812 def keepify(fileobj):
813 path = fileobj["location"]
814 if not path.startswith("keep:"):
815 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
816 adjustFileObjs(outputs, keepify)
817 adjustDirObjs(outputs, keepify)
819 logger.exception("[%s] While getting final output object", self.name)
820 self.arvrunner.output_callback({}, "permanentFail")
822 self.arvrunner.output_callback(outputs, processStatus)