1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
12 import urllib.request, urllib.parse, urllib.error
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
27 import arvados.collection
29 from .arvdocker import arv_docker_get_image
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
40 class ArvadosContainer(JobBase):
41 """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
43 def __init__(self, runner, job_runtime,
44 builder, # type: Builder
45 joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46 make_path_mapper, # type: Callable[..., PathMapper]
47 requirements, # type: List[Dict[Text, Text]]
48 hints, # type: List[Dict[Text, Text]]
51 super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52 self.arvrunner = runner
53 self.job_runtime = job_runtime
57 def update_pipeline_component(self, r):
60 def _required_env(self):
62 env["HOME"] = self.outdir
63 env["TMPDIR"] = self.tmpdir
66 def run(self, toplevelRuntimeContext):
67 # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
68 # which calls makeJobRunner() to get a new ArvadosContainer
69 # object. The fields that define execution such as
70 # command_line, environment, etc are set on the
71 # ArvadosContainer object by CommandLineTool.job() before
74 runtimeContext = self.job_runtime
76 if runtimeContext.submit_request_uuid:
77 container_request = self.arvrunner.api.container_requests().get(
78 uuid=runtimeContext.submit_request_uuid
79 ).execute(num_retries=self.arvrunner.num_retries)
81 container_request = {}
83 container_request["command"] = self.command_line
84 container_request["name"] = self.name
85 container_request["output_path"] = self.outdir
86 container_request["cwd"] = self.outdir
87 container_request["priority"] = runtimeContext.priority
88 container_request["state"] = "Committed"
89 container_request.setdefault("properties", {})
91 runtime_constraints = {}
93 if runtimeContext.project_uuid:
94 container_request["owner_uuid"] = runtimeContext.project_uuid
96 if self.arvrunner.secret_store.has_secret(self.command_line):
97 raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
99 if self.arvrunner.secret_store.has_secret(self.environment):
100 raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
102 resources = self.builder.resources
103 if resources is not None:
104 runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
105 runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
110 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
114 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
118 scheduling_parameters = {}
120 rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
121 rf.sort(key=lambda k: k.resolved)
123 for resolved, target, tp, stg in rf:
126 if prevdir and target.startswith(prevdir):
128 if tp == "Directory":
131 targetdir = os.path.dirname(target)
132 sp = resolved.split("/", 1)
133 pdh = sp[0][5:] # remove "keep:"
134 mounts[targetdir] = {
135 "kind": "collection",
136 "portable_data_hash": pdh
138 if pdh in self.pathmapper.pdh_to_uuid:
139 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
141 if tp == "Directory":
144 path = os.path.dirname(sp[1])
145 if path and path != "/":
146 mounts[targetdir]["path"] = path
147 prevdir = targetdir + "/"
149 with Perf(metrics, "generatefiles %s" % self.name):
150 if self.generatefiles["listing"]:
151 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
152 keep_client=self.arvrunner.keep_client,
153 num_retries=self.arvrunner.num_retries)
154 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
157 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
159 logger.debug("generatemapper is %s", sorteditems)
161 with Perf(metrics, "createfiles %s" % self.name):
162 for f, p in sorteditems:
166 if p.target.startswith("/"):
167 dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
171 if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
172 if p.resolved.startswith("_:"):
175 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
176 vwd.copy(path or ".", dst, source_collection=source)
177 elif p.type == "CreateFile":
178 if self.arvrunner.secret_store.has_secret(p.resolved):
179 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
180 secret_mounts[mountpoint] = {
182 "content": self.arvrunner.secret_store.retrieve(p.resolved)
185 with vwd.open(dst, "w") as n:
188 def keepemptydirs(p):
189 if isinstance(p, arvados.collection.RichCollectionBase):
191 p.open(".keep", "w").close()
198 if not runtimeContext.current_container:
199 runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
200 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
201 vwd.save_new(name=info["name"],
202 owner_uuid=runtimeContext.project_uuid,
203 ensure_unique_name=True,
204 trash_at=info["trash_at"],
205 properties=info["properties"])
208 for f, p in sorteditems:
209 if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
210 (prev is not None and p.target.startswith(prev))):
212 if p.target.startswith("/"):
213 dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
216 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
217 mounts[mountpoint] = {"kind": "collection",
218 "portable_data_hash": vwd.portable_data_hash(),
220 if p.type.startswith("Writable"):
221 mounts[mountpoint]["writable"] = True
222 prev = p.target + "/"
224 container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
226 container_request["environment"].update(self.environment)
229 sp = self.stdin[6:].split("/", 1)
230 mounts["stdin"] = {"kind": "collection",
231 "portable_data_hash": sp[0],
235 mounts["stderr"] = {"kind": "file",
236 "path": "%s/%s" % (self.outdir, self.stderr)}
239 mounts["stdout"] = {"kind": "file",
240 "path": "%s/%s" % (self.outdir, self.stdout)}
242 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
244 container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
246 runtimeContext.pull_image,
247 runtimeContext.project_uuid,
248 runtimeContext.force_docker_pull,
249 runtimeContext.tmp_outdir_prefix,
250 runtimeContext.match_local_docker)
252 network_req, _ = self.get_requirement("NetworkAccess")
254 runtime_constraints["API"] = network_req["networkAccess"]
256 api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
258 runtime_constraints["API"] = True
260 runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
262 if "keep_cache" in runtime_req:
263 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
264 if "outputDirType" in runtime_req:
265 if runtime_req["outputDirType"] == "local_output_dir":
266 # Currently the default behavior.
268 elif runtime_req["outputDirType"] == "keep_output_dir":
269 mounts[self.outdir]= {
270 "kind": "collection",
274 partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
276 scheduling_parameters["partitions"] = aslist(partition_req["partition"])
278 intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
279 if intermediate_output_req:
280 self.output_ttl = intermediate_output_req["outputTTL"]
282 self.output_ttl = self.arvrunner.intermediate_output_ttl
284 if self.output_ttl < 0:
285 raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
288 if self.arvrunner.api._rootDesc["revision"] >= "20210628":
289 storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
290 if storage_class_req and storage_class_req.get("intermediateStorageClass"):
291 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
293 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
295 cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
297 runtime_constraints["cuda"] = {
298 "device_count": resources.get("cudaDeviceCount", 1),
299 "driver_version": cuda_req["cudaVersionMin"],
300 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
303 if runtimeContext.enable_preemptible is False:
304 scheduling_parameters["preemptible"] = False
306 preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
308 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
309 elif runtimeContext.enable_preemptible is True:
310 scheduling_parameters["preemptible"] = True
311 elif runtimeContext.enable_preemptible is None:
314 if self.timelimit is not None and self.timelimit > 0:
315 scheduling_parameters["max_run_time"] = self.timelimit
317 extra_submit_params = {}
318 if runtimeContext.submit_runner_cluster:
319 extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
321 container_request["output_name"] = "Output for step %s" % (self.name)
322 container_request["output_ttl"] = self.output_ttl
323 container_request["mounts"] = mounts
324 container_request["secret_mounts"] = secret_mounts
325 container_request["runtime_constraints"] = runtime_constraints
326 container_request["scheduling_parameters"] = scheduling_parameters
328 enable_reuse = runtimeContext.enable_reuse
330 reuse_req, _ = self.get_requirement("WorkReuse")
332 enable_reuse = reuse_req["enableReuse"]
333 reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
335 enable_reuse = reuse_req["enableReuse"]
336 container_request["use_existing"] = enable_reuse
338 properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
340 for pr in properties_req["processProperties"]:
341 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
343 if runtimeContext.runnerjob.startswith("arvwf:"):
344 wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
345 wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
346 if container_request["name"] == "main":
347 container_request["name"] = wfrecord["name"]
348 container_request["properties"]["template_uuid"] = wfuuid
350 self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
353 if runtimeContext.submit_request_uuid:
354 response = self.arvrunner.api.container_requests().update(
355 uuid=runtimeContext.submit_request_uuid,
356 body=container_request,
357 **extra_submit_params
358 ).execute(num_retries=self.arvrunner.num_retries)
360 response = self.arvrunner.api.container_requests().create(
361 body=container_request,
362 **extra_submit_params
363 ).execute(num_retries=self.arvrunner.num_retries)
365 self.uuid = response["uuid"]
366 self.arvrunner.process_submitted(self)
368 if response["state"] == "Final":
369 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
371 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
372 except Exception as e:
373 logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
374 logger.debug("Container request was %s", container_request)
375 self.output_callback({}, "permanentFail")
377 def done(self, record):
380 container = self.arvrunner.api.containers().get(
381 uuid=record["container_uuid"]
382 ).execute(num_retries=self.arvrunner.num_retries)
383 if container["state"] == "Complete":
384 rcode = container["exit_code"]
385 if self.successCodes and rcode in self.successCodes:
386 processStatus = "success"
387 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
388 processStatus = "temporaryFail"
389 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
390 processStatus = "permanentFail"
392 processStatus = "success"
394 processStatus = "permanentFail"
397 logger.warning("%s job was killed on the compute instance. The most common reason is that it attempted to allocate too much RAM and was targeted by the Out Of Memory (OOM) killer. Try resubmitting with a higher 'ramMin'.",
398 self.arvrunner.label(self))
400 processStatus = "permanentFail"
402 if processStatus == "permanentFail" and record["log_uuid"]:
403 logc = arvados.collection.CollectionReader(record["log_uuid"],
404 api_client=self.arvrunner.api,
405 keep_client=self.arvrunner.keep_client,
406 num_retries=self.arvrunner.num_retries)
407 label = self.arvrunner.label(self)
410 "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
412 if record["output_uuid"]:
413 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
414 # Compute the trash time to avoid requesting the collection record.
415 trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
416 aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
417 orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
418 oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
419 logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
420 self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
421 self.arvrunner.add_intermediate_output(record["output_uuid"])
423 if container["output"]:
424 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
425 except WorkflowException as e:
426 # Only include a stack trace if in debug mode.
427 # A stack trace may obfuscate more useful output about the workflow.
428 logger.error("%s unable to collect output from %s:\n%s",
429 self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
430 processStatus = "permanentFail"
432 logger.exception("%s while getting output object:", self.arvrunner.label(self))
433 processStatus = "permanentFail"
435 self.output_callback(outputs, processStatus)
438 class RunnerContainer(Runner):
439 """Submit and manage a container that runs arvados-cwl-runner."""
441 def arvados_job_spec(self, runtimeContext):
442 """Create an Arvados container request for this workflow.
444 The returned dict can be used to create a container passed as
445 the +body+ argument to container_requests().create().
448 adjustDirObjs(self.job_order, trim_listing)
449 visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
450 visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
453 for param in sorted(self.job_order.keys()):
454 if self.secret_store.has_secret(self.job_order[param]):
455 mnt = "/secrets/s%d" % len(secret_mounts)
456 secret_mounts[mnt] = {
458 "content": self.secret_store.retrieve(self.job_order[param])
460 self.job_order[param] = {"$include": mnt}
464 "output_path": "/var/spool/cwl",
465 "cwd": "/var/spool/cwl",
466 "priority": self.priority,
467 "state": "Committed",
468 "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
470 "/var/lib/cwl/cwl.input.json": {
472 "content": self.job_order
476 "path": "/var/spool/cwl/cwl.output.json"
479 "kind": "collection",
483 "secret_mounts": secret_mounts,
484 "runtime_constraints": {
485 "vcpus": math.ceil(self.submit_runner_cores),
486 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
489 "use_existing": False, # Never reuse the runner container - see #15497.
493 if self.embedded_tool.tool.get("id", "").startswith("keep:"):
494 sp = self.embedded_tool.tool["id"].split('/')
495 workflowcollection = sp[0][5:]
496 workflowname = "/".join(sp[1:])
497 workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
498 container_req["mounts"]["/var/lib/cwl/workflow"] = {
499 "kind": "collection",
500 "portable_data_hash": "%s" % workflowcollection
503 packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
504 workflowpath = "/var/lib/cwl/workflow.json#main"
505 container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
509 if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
510 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
512 properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
514 builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
515 for pr in properties_req["processProperties"]:
516 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
518 # --local means execute the workflow instead of submitting a container request
519 # --api=containers means use the containers API
520 # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
521 # --disable-validate because we already validated so don't need to do it again
522 # --eval-timeout is the timeout for javascript invocation
523 # --parallel-task-count is the number of threads to use for job submission
524 # --enable/disable-reuse sets desired job reuse
525 # --collection-cache-size sets aside memory to store collections
526 command = ["arvados-cwl-runner",
529 "--no-log-timestamps",
530 "--disable-validate",
532 "--eval-timeout=%s" % self.arvrunner.eval_timeout,
533 "--thread-count=%s" % self.arvrunner.thread_count,
534 "--enable-reuse" if self.enable_reuse else "--disable-reuse",
535 "--collection-cache-size=%s" % self.collection_cache_size]
538 command.append("--output-name=" + self.output_name)
539 container_req["output_name"] = self.output_name
542 command.append("--output-tags=" + self.output_tags)
544 if runtimeContext.debug:
545 command.append("--debug")
547 if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
548 command.append("--storage-classes=" + runtimeContext.storage_classes)
550 if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
551 command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
554 command.append("--on-error=" + self.on_error)
556 if self.intermediate_output_ttl:
557 command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
559 if self.arvrunner.trash_intermediate:
560 command.append("--trash-intermediate")
562 if self.arvrunner.project_uuid:
563 command.append("--project-uuid="+self.arvrunner.project_uuid)
566 command.append("--enable-dev")
568 if runtimeContext.enable_preemptible is True:
569 command.append("--enable-preemptible")
571 if runtimeContext.enable_preemptible is False:
572 command.append("--disable-preemptible")
574 command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
576 container_req["command"] = command
581 def run(self, runtimeContext):
582 runtimeContext.keepprefix = "keep:"
583 job_spec = self.arvados_job_spec(runtimeContext)
584 if self.arvrunner.project_uuid:
585 job_spec["owner_uuid"] = self.arvrunner.project_uuid
587 extra_submit_params = {}
588 if runtimeContext.submit_runner_cluster:
589 extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
591 if runtimeContext.submit_request_uuid:
592 if "cluster_id" in extra_submit_params:
593 # Doesn't make sense for "update" and actually fails
594 del extra_submit_params["cluster_id"]
595 response = self.arvrunner.api.container_requests().update(
596 uuid=runtimeContext.submit_request_uuid,
598 **extra_submit_params
599 ).execute(num_retries=self.arvrunner.num_retries)
601 response = self.arvrunner.api.container_requests().create(
603 **extra_submit_params
604 ).execute(num_retries=self.arvrunner.num_retries)
606 self.uuid = response["uuid"]
607 self.arvrunner.process_submitted(self)
609 logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
611 workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
612 workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
615 url = "{}processes/{}".format(workbench2, response["uuid"])
617 url = "{}container_requests/{}".format(workbench1, response["uuid"])
619 logger.info("Monitor workflow progress at %s", url)
622 def done(self, record):
624 container = self.arvrunner.api.containers().get(
625 uuid=record["container_uuid"]
626 ).execute(num_retries=self.arvrunner.num_retries)
627 container["log"] = record["log_uuid"]
629 logger.exception("%s while getting runner container", self.arvrunner.label(self))
630 self.arvrunner.output_callback({}, "permanentFail")
632 super(RunnerContainer, self).done(container)