X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ffef38539dd36e224abb75f0480cff1deb319124..cf9afd31cf9bc682ea3f8c0d59012fa7080b0e9b:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 1261e94a5f..ec9c52c59a 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -21,18 +21,18 @@ import ruamel.yaml as yaml from cwltool.errors import WorkflowException from cwltool.process import UnsupportedRequirement, shortname -from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class -from cwltool.utils import aslist +from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class from cwltool.job import JobBase import arvados.collection from .arvdocker import arv_docker_get_image from . import done -from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields +from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder from .fsaccess import CollectionFetcher from .pathmapper import NoFollowPathMapper, trim_listing from .perf import Perf +from ._version import __version__ logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') @@ -57,7 +57,13 @@ class ArvadosContainer(JobBase): def update_pipeline_component(self, r): pass - def run(self, runtimeContext): + def _required_env(self): + env = {} + env["HOME"] = self.outdir + env["TMPDIR"] = self.tmpdir + return env + + def run(self, toplevelRuntimeContext): # ArvadosCommandTool subclasses from cwltool.CommandLineTool, # which calls makeJobRunner() to get a new ArvadosContainer # object. The fields that define execution such as @@ -67,15 +73,21 @@ class ArvadosContainer(JobBase): runtimeContext = self.job_runtime - container_request = { - "command": self.command_line, - "name": self.name, - "output_path": self.outdir, - "cwd": self.outdir, - "priority": runtimeContext.priority, - "state": "Committed", - "properties": {}, - } + if runtimeContext.submit_request_uuid: + container_request = self.arvrunner.api.container_requests().get( + uuid=runtimeContext.submit_request_uuid + ).execute(num_retries=self.arvrunner.num_retries) + else: + container_request = {} + + container_request["command"] = self.command_line + container_request["name"] = self.name + container_request["output_path"] = self.outdir + container_request["cwd"] = self.outdir + container_request["priority"] = runtimeContext.priority + container_request["state"] = "Committed" + container_request.setdefault("properties", {}) + runtime_constraints = {} if runtimeContext.project_uuid: @@ -123,6 +135,8 @@ class ArvadosContainer(JobBase): "kind": "collection", "portable_data_hash": pdh } + if pdh in self.pathmapper.pdh_to_uuid: + mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh] if len(sp) == 2: if tp == "Directory": path = sp[1] @@ -132,6 +146,8 @@ class ArvadosContainer(JobBase): mounts[targetdir]["path"] = path prevdir = targetdir + "/" + intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) + with Perf(metrics, "generatefiles %s" % self.name): if self.generatefiles["listing"]: vwd = arvados.collection.Collection(api_client=self.arvrunner.api, @@ -147,21 +163,28 @@ class ArvadosContainer(JobBase): with Perf(metrics, "createfiles %s" % self.name): for f, p in sorteditems: if not p.target: - pass - elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"): + continue + + if p.target.startswith("/"): + dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:] + else: + dst = p.target + + if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"): if p.resolved.startswith("_:"): - vwd.mkdirs(p.target) + vwd.mkdirs(dst) else: source, path = self.arvrunner.fs_access.get_collection(p.resolved) - vwd.copy(path, p.target, source_collection=source) + vwd.copy(path or ".", dst, source_collection=source) elif p.type == "CreateFile": if self.arvrunner.secret_store.has_secret(p.resolved): - secret_mounts["%s/%s" % (self.outdir, p.target)] = { + mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target) + secret_mounts[mountpoint] = { "kind": "text", "content": self.arvrunner.secret_store.retrieve(p.resolved) } else: - with vwd.open(p.target, "w") as n: + with vwd.open(dst, "w") as n: n.write(p.resolved) def keepemptydirs(p): @@ -176,22 +199,25 @@ class ArvadosContainer(JobBase): if not runtimeContext.current_container: runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) - info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) - vwd.save_new(name=info["name"], + vwd.save_new(name=intermediate_collection_info["name"], owner_uuid=runtimeContext.project_uuid, ensure_unique_name=True, - trash_at=info["trash_at"], - properties=info["properties"]) + trash_at=intermediate_collection_info["trash_at"], + properties=intermediate_collection_info["properties"]) prev = None for f, p in sorteditems: if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or (prev is not None and p.target.startswith(prev))): continue - mountpoint = "%s/%s" % (self.outdir, p.target) + if p.target.startswith("/"): + dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:] + else: + dst = p.target + mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target) mounts[mountpoint] = {"kind": "collection", "portable_data_hash": vwd.portable_data_hash(), - "path": p.target} + "path": dst} if p.type.startswith("Writable"): mounts[mountpoint]["writable"] = True prev = p.target + "/" @@ -215,13 +241,19 @@ class ArvadosContainer(JobBase): "path": "%s/%s" % (self.outdir, self.stdout)} (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") - if not docker_req: - docker_req = {"dockerImageId": "arvados/jobs"} container_request["container_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, runtimeContext.pull_image, - runtimeContext.project_uuid) + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) + + network_req, _ = self.get_requirement("NetworkAccess") + if network_req: + runtime_constraints["API"] = network_req["networkAccess"] api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement") if api_req: @@ -254,14 +286,41 @@ class ArvadosContainer(JobBase): if self.output_ttl < 0: raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"]) - if self.timelimit is not None: + + if self.arvrunner.api._rootDesc["revision"] >= "20210628": + storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass") + if storage_class_req and storage_class_req.get("intermediateStorageClass"): + container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"]) + else: + container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",") + + cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement") + if cuda_req: + runtime_constraints["cuda"] = { + "device_count": resources.get("cudaDeviceCount", 1), + "driver_version": cuda_req["cudaVersionMin"], + "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0] + } + + if runtimeContext.enable_preemptible is False: + scheduling_parameters["preemptible"] = False + else: + preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible") + if preemptible_req: + scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"] + elif runtimeContext.enable_preemptible is True: + scheduling_parameters["preemptible"] = True + elif runtimeContext.enable_preemptible is None: + pass + + if self.timelimit is not None and self.timelimit > 0: scheduling_parameters["max_run_time"] = self.timelimit extra_submit_params = {} if runtimeContext.submit_runner_cluster: extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster - container_request["output_name"] = "Output for step %s" % (self.name) + container_request["output_name"] = "Output from step %s" % (self.name) container_request["output_ttl"] = self.output_ttl container_request["mounts"] = mounts container_request["secret_mounts"] = secret_mounts @@ -270,11 +329,29 @@ class ArvadosContainer(JobBase): enable_reuse = runtimeContext.enable_reuse if enable_reuse: + reuse_req, _ = self.get_requirement("WorkReuse") + if reuse_req: + enable_reuse = reuse_req["enableReuse"] reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement") if reuse_req: enable_reuse = reuse_req["enableReuse"] container_request["use_existing"] = enable_reuse + properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties") + if properties_req: + for pr in properties_req["processProperties"]: + container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"]) + + output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties") + if output_properties_req: + if self.arvrunner.api._rootDesc["revision"] >= "20220510": + container_request["output_properties"] = {} + for pr in output_properties_req["outputProperties"]: + container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"]) + else: + logger.warning("%s API server is too old to support setting properties on output collections.", + self.arvrunner.label(self)) + if runtimeContext.runnerjob.startswith("arvwf:"): wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")] wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries) @@ -305,7 +382,8 @@ class ArvadosContainer(JobBase): else: logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"]) except Exception as e: - logger.error("%s got error %s" % (self.arvrunner.label(self), str(e))) + logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e) + logger.debug("Container request was %s", container_request) self.output_callback({}, "permanentFail") def done(self, record): @@ -326,11 +404,15 @@ class ArvadosContainer(JobBase): processStatus = "success" else: processStatus = "permanentFail" + + if rcode == 137: + logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.", + self.arvrunner.label(self)) else: processStatus = "permanentFail" - if processStatus == "permanentFail": - logc = arvados.collection.CollectionReader(container["log"], + if processStatus == "permanentFail" and record["log_uuid"]: + logc = arvados.collection.CollectionReader(record["log_uuid"], api_client=self.arvrunner.api, keep_client=self.arvrunner.keep_client, num_retries=self.arvrunner.num_retries) @@ -342,7 +424,7 @@ class ArvadosContainer(JobBase): if record["output_uuid"]: if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl: # Compute the trash time to avoid requesting the collection record. - trash_at = ciso8601.parse_datetime(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl) + trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl) aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else "" orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else "" oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else "" @@ -353,11 +435,13 @@ class ArvadosContainer(JobBase): if container["output"]: outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep") except WorkflowException as e: + # Only include a stack trace if in debug mode. + # A stack trace may obfuscate more useful output about the workflow. logger.error("%s unable to collect output from %s:\n%s", self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False)) processStatus = "permanentFail" - except Exception as e: - logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e) + except Exception: + logger.exception("%s while getting output object:", self.arvrunner.label(self)) processStatus = "permanentFail" finally: self.output_callback(outputs, processStatus) @@ -393,7 +477,7 @@ class RunnerContainer(Runner): "cwd": "/var/spool/cwl", "priority": self.priority, "state": "Committed", - "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image), + "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext), "mounts": { "/var/lib/cwl/cwl.input.json": { "kind": "json", @@ -414,7 +498,7 @@ class RunnerContainer(Runner): "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)), "API": True }, - "use_existing": self.enable_reuse, + "use_existing": False, # Never reuse the runner container - see #15497. "properties": {} } @@ -428,7 +512,7 @@ class RunnerContainer(Runner): "portable_data_hash": "%s" % workflowcollection } else: - packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map) + packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext) workflowpath = "/var/lib/cwl/workflow.json#main" container_req["mounts"]["/var/lib/cwl/workflow.json"] = { "kind": "json", @@ -437,6 +521,11 @@ class RunnerContainer(Runner): if self.embedded_tool.tool.get("id", "").startswith("arvwf:"): container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] + properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties") + if properties_req: + builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata) + for pr in properties_req["processProperties"]: + container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"]) # --local means execute the workflow instead of submitting a container request # --api=containers means use the containers API @@ -451,6 +540,7 @@ class RunnerContainer(Runner): "--api=containers", "--no-log-timestamps", "--disable-validate", + "--disable-color", "--eval-timeout=%s" % self.arvrunner.eval_timeout, "--thread-count=%s" % self.arvrunner.thread_count, "--enable-reuse" if self.enable_reuse else "--disable-reuse", @@ -466,20 +556,32 @@ class RunnerContainer(Runner): if runtimeContext.debug: command.append("--debug") - if runtimeContext.storage_classes != "default": + if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes: command.append("--storage-classes=" + runtimeContext.storage_classes) - if self.on_error: + if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes: + command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes) + + if runtimeContext.on_error: command.append("--on-error=" + self.on_error) - if self.intermediate_output_ttl: - command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl) + if runtimeContext.intermediate_output_ttl: + command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl) - if self.arvrunner.trash_intermediate: + if runtimeContext.trash_intermediate: command.append("--trash-intermediate") - if self.arvrunner.project_uuid: - command.append("--project-uuid="+self.arvrunner.project_uuid) + if runtimeContext.project_uuid: + command.append("--project-uuid="+runtimeContext.project_uuid) + + if self.enable_dev: + command.append("--enable-dev") + + if runtimeContext.enable_preemptible is True: + command.append("--enable-preemptible") + + if runtimeContext.enable_preemptible is False: + command.append("--disable-preemptible") command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"]) @@ -491,8 +593,8 @@ class RunnerContainer(Runner): def run(self, runtimeContext): runtimeContext.keepprefix = "keep:" job_spec = self.arvados_job_spec(runtimeContext) - if self.arvrunner.project_uuid: - job_spec["owner_uuid"] = self.arvrunner.project_uuid + if runtimeContext.project_uuid: + job_spec["owner_uuid"] = runtimeContext.project_uuid extra_submit_params = {} if runtimeContext.submit_runner_cluster: @@ -518,13 +620,25 @@ class RunnerContainer(Runner): logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"]) + workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"] + workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"] + url = "" + if workbench2: + url = "{}processes/{}".format(workbench2, response["uuid"]) + elif workbench1: + url = "{}container_requests/{}".format(workbench1, response["uuid"]) + if url: + logger.info("Monitor workflow progress at %s", url) + + def done(self, record): try: container = self.arvrunner.api.containers().get( uuid=record["container_uuid"] ).execute(num_retries=self.arvrunner.num_retries) - except Exception as e: - logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e) + container["log"] = record["log_uuid"] + except Exception: + logger.exception("%s while getting runner container", self.arvrunner.label(self)) self.arvrunner.output_callback({}, "permanentFail") else: super(RunnerContainer, self).done(container)