X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5c4d9d38dcee73a7ffb6221c80f707c3924da64f..81693ee04f18e68558c9206705ef589cca2460b0:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 03f5a5eb67..5fe29bc8f2 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -2,15 +2,21 @@ # # SPDX-License-Identifier: Apache-2.0 +from future import standard_library +standard_library.install_aliases() +from builtins import str + import logging import json import os -import urllib +import urllib.request, urllib.parse, urllib.error import time import datetime import ciso8601 import uuid +import math +import arvados_cwl.util import ruamel.yaml as yaml from cwltool.errors import WorkflowException @@ -21,7 +27,6 @@ from cwltool.job import JobBase import arvados.collection -from arvados.errors import ApiError from .arvdocker import arv_docker_get_image from . import done from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields @@ -35,7 +40,7 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics') class ArvadosContainer(JobBase): """Submit and manage a Crunch container request for executing a CWL CommandLineTool.""" - def __init__(self, runner, + def __init__(self, runner, job_runtime, builder, # type: Builder joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]] make_path_mapper, # type: Callable[..., PathMapper] @@ -45,6 +50,7 @@ class ArvadosContainer(JobBase): ): super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name) self.arvrunner = runner + self.job_runtime = job_runtime self.running = False self.uuid = None @@ -59,6 +65,8 @@ class ArvadosContainer(JobBase): # ArvadosContainer object by CommandLineTool.job() before # run() is called. + runtimeContext = self.job_runtime + container_request = { "command": self.command_line, "name": self.name, @@ -70,8 +78,8 @@ class ArvadosContainer(JobBase): } runtime_constraints = {} - if self.arvrunner.project_uuid: - container_request["owner_uuid"] = self.arvrunner.project_uuid + if runtimeContext.project_uuid: + container_request["owner_uuid"] = runtimeContext.project_uuid if self.arvrunner.secret_store.has_secret(self.command_line): raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets") @@ -81,17 +89,17 @@ class ArvadosContainer(JobBase): resources = self.builder.resources if resources is not None: - runtime_constraints["vcpus"] = resources.get("cores", 1) - runtime_constraints["ram"] = resources.get("ram") * 2**20 + runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1)) + runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20) mounts = { self.outdir: { "kind": "tmp", - "capacity": resources.get("outdirSize", 0) * 2**20 + "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20) }, self.tmpdir: { "kind": "tmp", - "capacity": resources.get("tmpdirSize", 0) * 2**20 + "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20) } } secret_mounts = {} @@ -129,10 +137,10 @@ class ArvadosContainer(JobBase): vwd = arvados.collection.Collection(api_client=self.arvrunner.api, keep_client=self.arvrunner.keep_client, num_retries=self.arvrunner.num_retries) - generatemapper = NoFollowPathMapper([self.generatefiles], "", "", + generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "", separateDirs=False) - sorteditems = sorted(generatemapper.items(), None, key=lambda n: n[1].target) + sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target) logger.debug("generatemapper is %s", sorteditems) @@ -154,7 +162,7 @@ class ArvadosContainer(JobBase): } else: with vwd.open(p.target, "w") as n: - n.write(p.resolved.encode("utf-8")) + n.write(p.resolved) def keepemptydirs(p): if isinstance(p, arvados.collection.RichCollectionBase): @@ -166,8 +174,11 @@ class ArvadosContainer(JobBase): keepemptydirs(vwd) - info = self._get_intermediate_collection_info() + if not runtimeContext.current_container: + runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) + info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) vwd.save_new(name=info["name"], + owner_uuid=runtimeContext.project_uuid, ensure_unique_name=True, trash_at=info["trash_at"], properties=info["properties"]) @@ -208,9 +219,9 @@ class ArvadosContainer(JobBase): docker_req = {"dockerImageId": "arvados/jobs"} container_request["container_image"] = arv_docker_get_image(self.arvrunner.api, - docker_req, - runtimeContext.pull_image, - self.arvrunner.project_uuid) + docker_req, + runtimeContext.pull_image, + runtimeContext.project_uuid) api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement") if api_req: @@ -219,7 +230,7 @@ class ArvadosContainer(JobBase): runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints") if runtime_req: if "keep_cache" in runtime_req: - runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20 + runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20) if "outputDirType" in runtime_req: if runtime_req["outputDirType"] == "local_output_dir": # Currently the default behavior. @@ -246,6 +257,11 @@ class ArvadosContainer(JobBase): if self.timelimit is not None: scheduling_parameters["max_run_time"] = self.timelimit + extra_submit_params = {} + if runtimeContext.submit_runner_cluster: + extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster + + container_request["output_name"] = "Output for step %s" % (self.name) container_request["output_ttl"] = self.output_ttl container_request["mounts"] = mounts container_request["secret_mounts"] = secret_mounts @@ -272,11 +288,13 @@ class ArvadosContainer(JobBase): if runtimeContext.submit_request_uuid: response = self.arvrunner.api.container_requests().update( uuid=runtimeContext.submit_request_uuid, - body=container_request + body=container_request, + **extra_submit_params ).execute(num_retries=self.arvrunner.num_retries) else: response = self.arvrunner.api.container_requests().create( - body=container_request + body=container_request, + **extra_submit_params ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] @@ -286,8 +304,8 @@ class ArvadosContainer(JobBase): logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"]) else: logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"]) - except Exception as e: - logger.error("%s got error %s" % (self.arvrunner.label(self), str(e))) + except Exception: + logger.exception("%s got an error", self.arvrunner.label(self)) self.output_callback({}, "permanentFail") def done(self, record): @@ -316,7 +334,10 @@ class ArvadosContainer(JobBase): api_client=self.arvrunner.api, keep_client=self.arvrunner.keep_client, num_retries=self.arvrunner.num_retries) - done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40) + label = self.arvrunner.label(self) + done.logtail( + logc, logger.error, + "%s (%s) error log:" % (label, record["uuid"]), maxlen=40) if record["output_uuid"]: if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl: @@ -332,35 +353,17 @@ class ArvadosContainer(JobBase): if container["output"]: outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep") except WorkflowException as e: + # Only include a stack trace if in debug mode. + # A stack trace may obfuscate more useful output about the workflow. logger.error("%s unable to collect output from %s:\n%s", self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False)) processStatus = "permanentFail" - except Exception as e: - logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e) + except Exception: + logger.exception("%s while getting output object:", self.arvrunner.label(self)) processStatus = "permanentFail" finally: self.output_callback(outputs, processStatus) - def _get_intermediate_collection_info(self): - trash_time = None - if self.arvrunner.intermediate_output_ttl > 0: - trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl) - - current_container_uuid = None - try: - current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries) - current_container_uuid = current_container['uuid'] - except ApiError as e: - # Status code 404 just means we're not running in a container. - if e.resp.status != 404: - logger.info("Getting current container: %s", e) - props = {"type": "Intermediate", - "container": current_container_uuid} - - return {"name" : "Intermediate collection", - "trash_at" : trash_time, - "properties" : props} - class RunnerContainer(Runner): """Submit and manage a container that runs arvados-cwl-runner.""" @@ -409,16 +412,16 @@ class RunnerContainer(Runner): }, "secret_mounts": secret_mounts, "runtime_constraints": { - "vcpus": 1, - "ram": 1024*1024 * self.submit_runner_ram, + "vcpus": math.ceil(self.submit_runner_cores), + "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)), "API": True }, "use_existing": self.enable_reuse, "properties": {} } - if self.tool.tool.get("id", "").startswith("keep:"): - sp = self.tool.tool["id"].split('/') + if self.embedded_tool.tool.get("id", "").startswith("keep:"): + sp = self.embedded_tool.tool["id"].split('/') workflowcollection = sp[0][5:] workflowname = "/".join(sp[1:]) workflowpath = "/var/lib/cwl/workflow/%s" % workflowname @@ -427,14 +430,14 @@ class RunnerContainer(Runner): "portable_data_hash": "%s" % workflowcollection } else: - packed = packed_workflow(self.arvrunner, self.tool, self.merged_map) + packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map) workflowpath = "/var/lib/cwl/workflow.json#main" container_req["mounts"]["/var/lib/cwl/workflow.json"] = { "kind": "json", "content": packed } - if self.tool.tool.get("id", "").startswith("arvwf:"): - container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33] + if self.embedded_tool.tool.get("id", "").startswith("arvwf:"): + container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] # --local means execute the workflow instead of submitting a container request @@ -444,6 +447,7 @@ class RunnerContainer(Runner): # --eval-timeout is the timeout for javascript invocation # --parallel-task-count is the number of threads to use for job submission # --enable/disable-reuse sets desired job reuse + # --collection-cache-size sets aside memory to store collections command = ["arvados-cwl-runner", "--local", "--api=containers", @@ -451,7 +455,8 @@ class RunnerContainer(Runner): "--disable-validate", "--eval-timeout=%s" % self.arvrunner.eval_timeout, "--thread-count=%s" % self.arvrunner.thread_count, - "--enable-reuse" if self.enable_reuse else "--disable-reuse"] + "--enable-reuse" if self.enable_reuse else "--disable-reuse", + "--collection-cache-size=%s" % self.collection_cache_size] if self.output_name: command.append("--output-name=" + self.output_name) @@ -491,28 +496,37 @@ class RunnerContainer(Runner): if self.arvrunner.project_uuid: job_spec["owner_uuid"] = self.arvrunner.project_uuid + extra_submit_params = {} + if runtimeContext.submit_runner_cluster: + extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster + if runtimeContext.submit_request_uuid: + if "cluster_id" in extra_submit_params: + # Doesn't make sense for "update" and actually fails + del extra_submit_params["cluster_id"] response = self.arvrunner.api.container_requests().update( uuid=runtimeContext.submit_request_uuid, - body=job_spec + body=job_spec, + **extra_submit_params ).execute(num_retries=self.arvrunner.num_retries) else: response = self.arvrunner.api.container_requests().create( - body=job_spec + body=job_spec, + **extra_submit_params ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] self.arvrunner.process_submitted(self) - logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"]) + logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"]) def done(self, record): try: container = self.arvrunner.api.containers().get( uuid=record["container_uuid"] ).execute(num_retries=self.arvrunner.num_retries) - except Exception as e: - logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e) + except Exception: + logger.exception("%s while getting runner container", self.arvrunner.label(self)) self.arvrunner.output_callback({}, "permanentFail") else: super(RunnerContainer, self).done(container)