X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3120003dc579730ac67cac8a47f209b14ec748d3..2feec8cbac7ef28a47f3c3a9d071b070ef38cb6e:/sdk/cwl/arvados_cwl/arvjob.py?ds=inline diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 85ada26d23..64cd2aa04e 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -1,3 +1,7 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + import logging import re import copy @@ -16,6 +20,7 @@ from schema_salad.sourceline import SourceLine import ruamel.yaml as yaml import arvados.collection +from arvados.errors import ApiError from .arvdocker import arv_docker_get_image from .runner import Runner, arvados_jobs_image, packed_workflow, upload_workflow_collection, trim_anonymous_location @@ -123,6 +128,12 @@ class ArvadosJob(object): if not self.arvrunner.ignore_docker_for_reuse: filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]]) + enable_reuse = kwargs.get("enable_reuse", True) + if enable_reuse: + reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement") + if reuse_req: + enable_reuse = reuse_req["enableReuse"] + try: with Perf(metrics, "create %s" % self.name): response = self.arvrunner.api.jobs().create( @@ -136,7 +147,7 @@ class ArvadosJob(object): "runtime_constraints": runtime_constraints }, filters=filters, - find_or_create=kwargs.get("enable_reuse", True) + find_or_create=enable_reuse ).execute(num_retries=self.arvrunner.num_retries) self.arvrunner.processes[response["uuid"]] = self @@ -145,6 +156,22 @@ class ArvadosJob(object): if response["state"] == "Complete": logger.info("%s reused job %s", self.arvrunner.label(self), response["uuid"]) + # Give read permission to the desired project on reused jobs + if response["owner_uuid"] != self.arvrunner.project_uuid: + try: + self.arvrunner.api.links().create(body={ + 'link_class': 'permission', + 'name': 'can_read', + 'tail_uuid': self.arvrunner.project_uuid, + 'head_uuid': response["uuid"], + }).execute(num_retries=self.arvrunner.num_retries) + except ApiError as e: + # The user might not have "manage" access on the job: log + # a message and continue. + logger.info("Creating read permission on job %s: %s", + response["uuid"], + e) + with Perf(metrics, "done %s" % self.name): self.done(response) else: @@ -291,22 +318,6 @@ class RunnerJob(Runner): find_or_create=self.enable_reuse ).execute(num_retries=self.arvrunner.num_retries) - if self.enable_reuse: - reused_collections = [('Output', job['output']), ('Log', job['log'])] - for col_type, pdh in [(n, p) for n, p in reused_collections if p]: - # When reusing jobs, copy its output/log collection to the desired project - c = arvados.collection.Collection(pdh, - api_client=self.arvrunner.api, - keep_client=self.arvrunner.keep_client, - num_retries=self.arvrunner.num_retries) - c.save_new(name="{} of {}".format(col_type, self.name), - owner_uuid=self.arvrunner.project_uuid, - ensure_unique_name=True, - num_retries=self.arvrunner.num_retries) - logger.info("Copied reused job's %s to collection %s", - col_type.lower(), - c.manifest_locator()) - for k,v in job_spec["script_parameters"].items(): if v is False or v is None or isinstance(v, dict): job_spec["script_parameters"][k] = {"value": v}