X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/56555817311d4cadf261cd3aa380361690e6fe65..22fcbed20b2691bcdfa5854004dcb95aa1f2e40d:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 0c7ad732d2..c9170c51b7 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -21,8 +21,7 @@ import ruamel.yaml as yaml from cwltool.errors import WorkflowException from cwltool.process import UnsupportedRequirement, shortname -from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class -from cwltool.utils import aslist +from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class from cwltool.job import JobBase import arvados.collection @@ -33,6 +32,7 @@ from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_ from .fsaccess import CollectionFetcher from .pathmapper import NoFollowPathMapper, trim_listing from .perf import Perf +from ._version import __version__ logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') @@ -149,21 +149,28 @@ class ArvadosContainer(JobBase): with Perf(metrics, "createfiles %s" % self.name): for f, p in sorteditems: if not p.target: - pass - elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"): + continue + + if p.target.startswith("/"): + dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:] + else: + dst = p.target + + if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"): if p.resolved.startswith("_:"): - vwd.mkdirs(p.target) + vwd.mkdirs(dst) else: source, path = self.arvrunner.fs_access.get_collection(p.resolved) - vwd.copy(path, p.target, source_collection=source) + vwd.copy(path or ".", dst, source_collection=source) elif p.type == "CreateFile": if self.arvrunner.secret_store.has_secret(p.resolved): - secret_mounts["%s/%s" % (self.outdir, p.target)] = { + mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target) + secret_mounts[mountpoint] = { "kind": "text", "content": self.arvrunner.secret_store.retrieve(p.resolved) } else: - with vwd.open(p.target, "w") as n: + with vwd.open(dst, "w") as n: n.write(p.resolved) def keepemptydirs(p): @@ -190,10 +197,14 @@ class ArvadosContainer(JobBase): if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or (prev is not None and p.target.startswith(prev))): continue - mountpoint = "%s/%s" % (self.outdir, p.target) + if p.target.startswith("/"): + dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:] + else: + dst = p.target + mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target) mounts[mountpoint] = {"kind": "collection", "portable_data_hash": vwd.portable_data_hash(), - "path": p.target} + "path": dst} if p.type.startswith("Writable"): mounts[mountpoint]["writable"] = True prev = p.target + "/" @@ -218,12 +229,14 @@ class ArvadosContainer(JobBase): (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") if not docker_req: - docker_req = {"dockerImageId": "arvados/jobs"} + docker_req = {"dockerImageId": "arvados/jobs:"+__version__} container_request["container_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, runtimeContext.pull_image, - runtimeContext.project_uuid) + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix) network_req, _ = self.get_requirement("NetworkAccess") if network_req: @@ -260,6 +273,12 @@ class ArvadosContainer(JobBase): if self.output_ttl < 0: raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"]) + storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass") + if storage_class_req and storage_class_req.get("intermediateStorageClass"): + container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"]) + else: + container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",") + if self.timelimit is not None and self.timelimit > 0: scheduling_parameters["max_run_time"] = self.timelimit @@ -276,6 +295,9 @@ class ArvadosContainer(JobBase): enable_reuse = runtimeContext.enable_reuse if enable_reuse: + reuse_req, _ = self.get_requirement("WorkReuse") + if reuse_req: + enable_reuse = reuse_req["enableReuse"] reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement") if reuse_req: enable_reuse = reuse_req["enableReuse"] @@ -310,8 +332,9 @@ class ArvadosContainer(JobBase): logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"]) else: logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"]) - except Exception: - logger.exception("%s got an error", self.arvrunner.label(self)) + except Exception as e: + logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e) + logger.debug("Container request was %s", container_request) self.output_callback({}, "permanentFail") def done(self, record): @@ -422,7 +445,7 @@ class RunnerContainer(Runner): "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)), "API": True }, - "use_existing": self.enable_reuse, + "use_existing": False, # Never reuse the runner container - see #15497. "properties": {} } @@ -459,6 +482,7 @@ class RunnerContainer(Runner): "--api=containers", "--no-log-timestamps", "--disable-validate", + "--disable-color", "--eval-timeout=%s" % self.arvrunner.eval_timeout, "--thread-count=%s" % self.arvrunner.thread_count, "--enable-reuse" if self.enable_reuse else "--disable-reuse", @@ -477,6 +501,9 @@ class RunnerContainer(Runner): if runtimeContext.storage_classes != "default": command.append("--storage-classes=" + runtimeContext.storage_classes) + if runtimeContext.intermediate_storage_classes != "default": + command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes) + if self.on_error: command.append("--on-error=" + self.on_error) @@ -529,6 +556,17 @@ class RunnerContainer(Runner): logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"]) + workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"] + workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"] + url = "" + if workbench2: + url = "{}processes/{}".format(workbench2, response["uuid"]) + elif workbench1: + url = "{}container_requests/{}".format(workbench1, response["uuid"]) + if url: + logger.info("Monitor workflow progress at %s", url) + + def done(self, record): try: container = self.arvrunner.api.containers().get(