1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
53 logger = logging.getLogger('arvados.cwl-runner')
55 def trim_anonymous_location(obj):
56 """Remove 'location' field from File and Directory literals.
58 To make internal handling easier, literals are assigned a random id for
59 'location'. However, when writing the record back out, this can break
60 reproducibility. Since it is valid for literals not have a 'location'
65 if obj.get("location", "").startswith("_:"):
69 def remove_redundant_fields(obj):
70 for field in ("path", "nameext", "nameroot", "dirname"):
75 def find_defaults(d, op):
76 if isinstance(d, list):
79 elif isinstance(d, dict):
83 for i in viewvalues(d):
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
89 files=[], # type: List[Dict[Text, Text]]
90 bindings=[], # type: List[Dict[Text, Any]]
91 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
92 names=None, # type: Names
93 requirements=requirements, # type: List[Dict[Text, Any]]
94 hints=hints, # type: List[Dict[Text, Any]]
95 resources={}, # type: Dict[str, int]
96 mutation_manager=None, # type: Optional[MutationManager]
97 formatgraph=None, # type: Optional[Graph]
98 make_fs_access=None, # type: Type[StdFsAccess]
99 fs_access=None, # type: StdFsAccess
100 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101 timeout=runtimeContext.eval_timeout, # type: float
102 debug=runtimeContext.debug, # type: bool
103 js_console=runtimeContext.js_console, # type: bool
104 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
105 loadListing="", # type: Text
106 outdir="", # type: Text
107 tmpdir="", # type: Text
108 stagedir="", # type: Text
109 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
110 container_engine="docker"
113 def search_schemadef(name, reqs):
115 if r["class"] == "SchemaDefRequirement":
116 for sd in r["types"]:
117 if sd["name"] == name:
121 primitive_types_set = frozenset(("null", "boolean", "int", "long",
122 "float", "double", "string", "record",
125 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
126 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
127 # union type, collect all possible secondaryFiles
128 for i in inputschema:
129 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
132 if isinstance(inputschema, basestring):
133 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
139 if "secondaryFiles" in inputschema:
140 # set secondaryFiles, may be inherited by compound types.
141 secondaryspec = inputschema["secondaryFiles"]
143 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
144 not isinstance(inputschema["type"], basestring)):
145 # compound type (union, array, record)
146 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
148 elif (inputschema["type"] == "record" and
149 isinstance(primary, Mapping)):
151 # record type, find secondary files associated with fields.
153 for f in inputschema["fields"]:
154 p = primary.get(shortname(f["name"]))
156 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
158 elif (inputschema["type"] == "array" and
159 isinstance(primary, Sequence)):
161 # array type, find secondary files of elements
164 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
166 elif (inputschema["type"] == "File" and
168 isinstance(primary, Mapping) and
169 primary.get("class") == "File" and
170 "secondaryFiles" not in primary):
172 # Found a file, check for secondaryFiles
175 primary["secondaryFiles"] = secondaryspec
176 for i, sf in enumerate(aslist(secondaryspec)):
177 if builder.cwlVersion == "v1.0":
178 pattern = builder.do_eval(sf, context=primary)
180 pattern = builder.do_eval(sf["pattern"], context=primary)
183 if isinstance(pattern, list):
184 specs.extend(pattern)
185 elif isinstance(pattern, dict):
186 specs.append(pattern)
187 elif isinstance(pattern, str):
188 if builder.cwlVersion == "v1.0":
189 specs.append({"pattern": pattern, "required": True})
191 specs.append({"pattern": pattern, "required": sf.get("required")})
193 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
194 "Expression must return list, object, string or null")
197 for i, sf in enumerate(specs):
198 if isinstance(sf, dict):
199 if sf.get("class") == "File":
201 if sf.get("location") is None:
202 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
203 "File object is missing 'location': %s" % sf)
204 sfpath = sf["location"]
207 pattern = sf["pattern"]
208 required = sf.get("required")
209 elif isinstance(sf, str):
213 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214 "Expression must return list, object, string or null")
216 if pattern is not None:
217 sfpath = substitute(primary["location"], pattern)
219 required = builder.do_eval(required, context=primary)
221 if fsaccess.exists(sfpath):
222 if pattern is not None:
223 found.append({"location": sfpath, "class": "File"})
227 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
228 "Required secondary file '%s' does not exist" % sfpath)
230 primary["secondaryFiles"] = cmap(found)
231 if discovered is not None:
232 discovered[primary["location"]] = primary["secondaryFiles"]
233 elif inputschema["type"] not in primitive_types_set:
234 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
236 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
237 for inputschema in inputs:
238 primary = job_order.get(shortname(inputschema["id"]))
239 if isinstance(primary, (Mapping, Sequence)):
240 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
242 def upload_dependencies(arvrunner, name, document_loader,
243 workflowobj, uri, loadref_run,
244 include_primary=True, discovered_secondaryfiles=None):
245 """Upload the dependencies of the workflowobj document to Keep.
247 Returns a pathmapper object mapping local paths to keep references. Also
248 does an in-place update of references in "workflowobj".
250 Use scandeps to find $import, $include, $schemas, run, File and Directory
251 fields that represent external references.
253 If workflowobj has an "id" field, this will reload the document to ensure
254 it is scanning the raw document prior to preprocessing.
259 joined = document_loader.fetcher.urljoin(b, u)
260 defrg, _ = urllib.parse.urldefrag(joined)
261 if defrg not in loaded:
263 # Use fetch_text to get raw file (before preprocessing).
264 text = document_loader.fetch_text(defrg)
265 if isinstance(text, bytes):
266 textIO = StringIO(text.decode('utf-8'))
268 textIO = StringIO(text)
269 yamlloader = YAML(typ='safe', pure=True)
270 return yamlloader.load(textIO)
275 loadref_fields = set(("$import", "run"))
277 loadref_fields = set(("$import",))
279 scanobj = workflowobj
280 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
281 # Need raw file content (before preprocessing) to ensure
282 # that external references in $include and $mixin are captured.
283 scanobj = loadref("", workflowobj["id"])
287 sc_result = scandeps(uri, scanobj,
289 set(("$include", "location")),
290 loadref, urljoin=document_loader.fetcher.urljoin,
293 optional_deps = scandeps(uri, scanobj,
296 loadref, urljoin=document_loader.fetcher.urljoin,
299 sc_result.extend(optional_deps)
304 def collect_uuids(obj):
305 loc = obj.get("location", "")
308 # Collect collection uuids that need to be resolved to
309 # portable data hashes
310 gp = collection_uuid_pattern.match(loc)
312 uuids[gp.groups()[0]] = obj
313 if collectionUUID in obj:
314 uuids[obj[collectionUUID]] = obj
316 def collect_uploads(obj):
317 loc = obj.get("location", "")
321 if sp[0] in ("file", "http", "https"):
322 # Record local files than need to be uploaded,
323 # don't include file literals, keep references, etc.
327 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
328 visit_class(sc_result, ("File", "Directory"), collect_uploads)
330 # Resolve any collection uuids we found to portable data hashes
331 # and assign them to uuid_map
333 fetch_uuids = list(uuids.keys())
335 # For a large number of fetch_uuids, API server may limit
336 # response size, so keep fetching from API server has nothing
338 lookups = arvrunner.api.collections().list(
339 filters=[["uuid", "in", fetch_uuids]],
341 select=["uuid", "portable_data_hash"]).execute(
342 num_retries=arvrunner.num_retries)
344 if not lookups["items"]:
347 for l in lookups["items"]:
348 uuid_map[l["uuid"]] = l["portable_data_hash"]
350 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
352 normalizeFilesDirs(sc)
354 if include_primary and "id" in workflowobj:
355 sc.append({"class": "File", "location": workflowobj["id"]})
357 def visit_default(obj):
358 def defaults_are_optional(f):
359 if "location" not in f and "path" in f:
360 f["location"] = f["path"]
362 normalizeFilesDirs(f)
363 optional_deps.append(f)
364 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
366 find_defaults(workflowobj, visit_default)
369 def discover_default_secondary_files(obj):
370 builder_job_order = {}
371 for t in obj["inputs"]:
372 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
373 # Need to create a builder object to evaluate expressions.
374 builder = make_builder(builder_job_order,
375 obj.get("hints", []),
376 obj.get("requirements", []),
379 discover_secondary_files(arvrunner.fs_access,
385 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
386 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
388 for d in list(discovered):
389 # Only interested in discovered secondaryFiles which are local
390 # files that need to be uploaded.
391 if d.startswith("file:"):
392 sc.extend(discovered[d])
396 mapper = ArvPathMapper(arvrunner, sc, "",
400 single_collection=True,
401 optional_deps=optional_deps)
405 keeprefs.add(collection_pdh_pattern.match(k).group(1))
408 loc = p.get("location")
409 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
410 p["location"] = mapper.mapper(p["location"]).resolved
411 addkeepref(p["location"])
417 if collectionUUID in p:
418 uuid = p[collectionUUID]
419 if uuid not in uuid_map:
420 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
421 "Collection uuid %s not found" % uuid)
422 gp = collection_pdh_pattern.match(loc)
423 if gp and uuid_map[uuid] != gp.groups()[0]:
424 # This file entry has both collectionUUID and a PDH
425 # location. If the PDH doesn't match the one returned
426 # the API server, raise an error.
427 raise SourceLine(p, "location", validate.ValidationException).makeError(
428 "Expected collection uuid %s to be %s but API server reported %s" % (
429 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
431 gp = collection_uuid_pattern.match(loc)
433 # Not a uuid pattern (must be a pdh pattern)
434 addkeepref(p["location"])
437 uuid = gp.groups()[0]
438 if uuid not in uuid_map:
439 raise SourceLine(p, "location", validate.ValidationException).makeError(
440 "Collection uuid %s not found" % uuid)
441 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
442 p[collectionUUID] = uuid
444 visit_class(workflowobj, ("File", "Directory"), setloc)
445 visit_class(discovered, ("File", "Directory"), setloc)
447 if discovered_secondaryfiles is not None:
449 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
451 if arvrunner.runtimeContext.copy_deps:
452 # Find referenced collections and copy them into the
453 # destination project, for easy sharing.
454 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
455 filters=[["portable_data_hash", "in", list(keeprefs)],
456 ["owner_uuid", "=", arvrunner.project_uuid]],
457 select=["uuid", "portable_data_hash", "created_at"]))
459 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
461 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
462 order="created_at desc",
463 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
465 if len(col["items"]) == 0:
466 logger.warning("Cannot find collection with portable data hash %s", kr)
468 col = col["items"][0]
470 arvrunner.api.collections().create(body={"collection": {
471 "owner_uuid": arvrunner.project_uuid,
473 "description": col["description"],
474 "properties": col["properties"],
475 "portable_data_hash": col["portable_data_hash"],
476 "manifest_text": col["manifest_text"],
477 "storage_classes_desired": col["storage_classes_desired"],
478 "trash_at": col["trash_at"]
479 }}, ensure_unique_name=True).execute()
480 except Exception as e:
481 logger.warning("Unable copy collection to destination: %s", e)
483 if "$schemas" in workflowobj:
485 for s in workflowobj["$schemas"]:
487 sch.append(mapper.mapper(s).resolved)
488 workflowobj["$schemas"] = sch
493 def upload_docker(arvrunner, tool):
494 """Uploads Docker images used in CommandLineTool objects."""
496 if isinstance(tool, CommandLineTool):
497 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
499 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
500 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
501 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
503 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
504 arvrunner.runtimeContext.force_docker_pull,
505 arvrunner.runtimeContext.tmp_outdir_prefix,
506 arvrunner.runtimeContext.match_local_docker,
507 arvrunner.runtimeContext.copy_deps)
509 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
510 True, arvrunner.project_uuid,
511 arvrunner.runtimeContext.force_docker_pull,
512 arvrunner.runtimeContext.tmp_outdir_prefix,
513 arvrunner.runtimeContext.match_local_docker,
514 arvrunner.runtimeContext.copy_deps)
515 elif isinstance(tool, cwltool.workflow.Workflow):
517 upload_docker(arvrunner, s.embedded_tool)
520 def packed_workflow(arvrunner, tool, merged_map):
521 """Create a packed workflow.
523 A "packed" workflow is one where all the components have been combined into a single document."""
526 packed = pack(arvrunner.loadingContext, tool.tool["id"],
527 rewrite_out=rewrites,
528 loader=tool.doc_loader)
530 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
532 def visit(v, cur_id):
533 if isinstance(v, dict):
534 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
535 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
536 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
538 cur_id = rewrite_to_orig.get(v["id"], v["id"])
539 if "path" in v and "location" not in v:
540 v["location"] = v["path"]
542 if "location" in v and cur_id in merged_map:
543 if v["location"] in merged_map[cur_id].resolved:
544 v["location"] = merged_map[cur_id].resolved[v["location"]]
545 if v["location"] in merged_map[cur_id].secondaryFiles:
546 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
547 if v.get("class") == "DockerRequirement":
548 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
549 arvrunner.project_uuid,
550 arvrunner.runtimeContext.force_docker_pull,
551 arvrunner.runtimeContext.tmp_outdir_prefix,
552 arvrunner.runtimeContext.match_local_docker,
553 arvrunner.runtimeContext.copy_deps)
556 if isinstance(v, list):
563 def tag_git_version(packed):
564 if tool.tool["id"].startswith("file://"):
565 path = os.path.dirname(tool.tool["id"][7:])
567 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
568 except (OSError, subprocess.CalledProcessError):
571 packed["http://schema.org/version"] = githash
574 def upload_job_order(arvrunner, name, tool, job_order):
575 """Upload local files referenced in the input object and return updated input
576 object with 'location' updated to the proper keep references.
579 # Make a copy of the job order and set defaults.
580 builder_job_order = copy.copy(job_order)
582 # fill_in_defaults throws an error if there are any
583 # missing required parameters, we don't want it to do that
584 # so make them all optional.
585 inputs_copy = copy.deepcopy(tool.tool["inputs"])
586 for i in inputs_copy:
587 if "null" not in i["type"]:
588 i["type"] = ["null"] + aslist(i["type"])
590 fill_in_defaults(inputs_copy,
593 # Need to create a builder object to evaluate expressions.
594 builder = make_builder(builder_job_order,
599 # Now update job_order with secondaryFiles
600 discover_secondary_files(arvrunner.fs_access,
605 jobmapper = upload_dependencies(arvrunner,
609 job_order.get("id", "#"),
612 if "id" in job_order:
615 # Need to filter this out, gets added by cwltool when providing
616 # parameters on the command line.
617 if "job_order" in job_order:
618 del job_order["job_order"]
622 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
624 def upload_workflow_deps(arvrunner, tool):
625 # Ensure that Docker images needed by this workflow are available
627 upload_docker(arvrunner, tool)
629 document_loader = tool.doc_loader
633 def upload_tool_deps(deptool):
635 discovered_secondaryfiles = {}
636 pm = upload_dependencies(arvrunner,
637 "%s dependencies" % (shortname(deptool["id"])),
642 include_primary=False,
643 discovered_secondaryfiles=discovered_secondaryfiles)
644 document_loader.idx[deptool["id"]] = deptool
646 for k,v in pm.items():
647 toolmap[k] = v.resolved
648 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
650 tool.visit(upload_tool_deps)
654 def arvados_jobs_image(arvrunner, img):
655 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
658 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
659 arvrunner.runtimeContext.force_docker_pull,
660 arvrunner.runtimeContext.tmp_outdir_prefix,
661 arvrunner.runtimeContext.match_local_docker,
662 arvrunner.runtimeContext.copy_deps)
663 except Exception as e:
664 raise Exception("Docker image %s is not available\n%s" % (img, e) )
667 def upload_workflow_collection(arvrunner, name, packed):
668 collection = arvados.collection.Collection(api_client=arvrunner.api,
669 keep_client=arvrunner.keep_client,
670 num_retries=arvrunner.num_retries)
671 with collection.open("workflow.cwl", "w") as f:
672 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
674 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
675 ["name", "like", name+"%"]]
676 if arvrunner.project_uuid:
677 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
678 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
681 logger.info("Using collection %s", exists["items"][0]["uuid"])
683 collection.save_new(name=name,
684 owner_uuid=arvrunner.project_uuid,
685 ensure_unique_name=True,
686 num_retries=arvrunner.num_retries)
687 logger.info("Uploaded to %s", collection.manifest_locator())
689 return collection.portable_data_hash()
692 class Runner(Process):
693 """Base class for runner processes, which submit an instance of
694 arvados-cwl-runner and wait for the final result."""
696 def __init__(self, runner, updated_tool,
697 tool, loadingContext, enable_reuse,
698 output_name, output_tags, submit_runner_ram=0,
699 name=None, on_error=None, submit_runner_image=None,
700 intermediate_output_ttl=0, merged_map=None,
701 priority=None, secret_store=None,
702 collection_cache_size=256,
703 collection_cache_is_default=True):
705 loadingContext = loadingContext.copy()
706 loadingContext.metadata = updated_tool.metadata.copy()
708 super(Runner, self).__init__(updated_tool.tool, loadingContext)
710 self.arvrunner = runner
711 self.embedded_tool = tool
712 self.job_order = None
715 # If reuse is permitted by command line arguments but
716 # disabled by the workflow itself, disable it.
717 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
719 enable_reuse = reuse_req["enableReuse"]
720 self.enable_reuse = enable_reuse
722 self.final_output = None
723 self.output_name = output_name
724 self.output_tags = output_tags
726 self.on_error = on_error
727 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
728 self.intermediate_output_ttl = intermediate_output_ttl
729 self.priority = priority
730 self.secret_store = secret_store
731 self.enable_dev = loadingContext.enable_dev
733 self.submit_runner_cores = 1
734 self.submit_runner_ram = 1024 # defaut 1 GiB
735 self.collection_cache_size = collection_cache_size
737 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
738 if runner_resource_req:
739 if runner_resource_req.get("coresMin"):
740 self.submit_runner_cores = runner_resource_req["coresMin"]
741 if runner_resource_req.get("ramMin"):
742 self.submit_runner_ram = runner_resource_req["ramMin"]
743 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
744 self.collection_cache_size = runner_resource_req["keep_cache"]
746 if submit_runner_ram:
747 # Command line / initializer overrides default and/or spec from workflow
748 self.submit_runner_ram = submit_runner_ram
750 if self.submit_runner_ram <= 0:
751 raise Exception("Value of submit-runner-ram must be greater than zero")
753 if self.submit_runner_cores <= 0:
754 raise Exception("Value of submit-runner-cores must be greater than zero")
756 self.merged_map = merged_map or {}
759 job_order, # type: Mapping[Text, Text]
760 output_callbacks, # type: Callable[[Any, Any], Any]
761 runtimeContext # type: RuntimeContext
762 ): # type: (...) -> Generator[Any, None, None]
763 self.job_order = job_order
764 self._init_job(job_order, runtimeContext)
767 def update_pipeline_component(self, record):
770 def done(self, record):
771 """Base method for handling a completed runner."""
774 if record["state"] == "Complete":
775 if record.get("exit_code") is not None:
776 if record["exit_code"] == 33:
777 processStatus = "UnsupportedRequirement"
778 elif record["exit_code"] == 0:
779 processStatus = "success"
781 processStatus = "permanentFail"
783 processStatus = "success"
785 processStatus = "permanentFail"
789 if processStatus == "permanentFail":
790 logc = arvados.collection.CollectionReader(record["log"],
791 api_client=self.arvrunner.api,
792 keep_client=self.arvrunner.keep_client,
793 num_retries=self.arvrunner.num_retries)
794 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
796 self.final_output = record["output"]
797 outc = arvados.collection.CollectionReader(self.final_output,
798 api_client=self.arvrunner.api,
799 keep_client=self.arvrunner.keep_client,
800 num_retries=self.arvrunner.num_retries)
801 if "cwl.output.json" in outc:
802 with outc.open("cwl.output.json", "rb") as f:
804 outputs = json.loads(f.read().decode())
805 def keepify(fileobj):
806 path = fileobj["location"]
807 if not path.startswith("keep:"):
808 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
809 adjustFileObjs(outputs, keepify)
810 adjustDirObjs(outputs, keepify)
812 logger.exception("[%s] While getting final output object", self.name)
813 self.arvrunner.output_callback({}, "permanentFail")
815 self.arvrunner.output_callback(outputs, processStatus)