X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/46df7c2b292e5f12da365b918b0f28757eb4c4ce..331db3ab818292057af3c39e18bd76d654d9fab5:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 7bd9df344c..27af075f36 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -27,6 +27,7 @@ from .fsaccess import CollectionFsAccess from cwltool.process import shortname, UnsupportedRequirement from cwltool.pathmapper import adjustFileObjs +from cwltool.draft2tool import compute_checksums from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') @@ -104,9 +105,21 @@ class ArvCwlRunner(object): def add_uploaded(self, src, pair): self.uploaded[src] = pair + def check_writable(self, obj): + if isinstance(obj, dict): + if obj.get("writable"): + raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported") + for v in obj.itervalues(): + self.check_writable(v) + if isinstance(obj, list): + for v in obj: + self.check_writable(v) + def arvExecutor(self, tool, job_order, **kwargs): self.debug = kwargs.get("debug") + tool.visit(self.check_writable) + if kwargs.get("quiet"): logger.setLevel(logging.WARN) logging.getLogger('arvados.arv-run').setLevel(logging.WARN) @@ -114,6 +127,8 @@ class ArvCwlRunner(object): useruuid = self.api.users().current().execute()["uuid"] self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid self.pipeline = None + make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api) + self.fs_access = make_fs_access(kwargs["basedir"]) if kwargs.get("create_template"): tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse")) @@ -124,7 +139,7 @@ class ArvCwlRunner(object): self.debug = kwargs.get("debug") self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse") - self.fs_access = kwargs["make_fs_access"](kwargs["basedir"]) + kwargs["make_fs_access"] = make_fs_access kwargs["enable_reuse"] = kwargs.get("enable_reuse") kwargs["use_container"] = True kwargs["tmpdir_prefix"] = "tmp" @@ -135,6 +150,7 @@ class ArvCwlRunner(object): kwargs["outdir"] = "/var/spool/cwl" kwargs["docker_outdir"] = "/var/spool/cwl" kwargs["tmpdir"] = "/tmp" + kwargs["docker_tmpdir"] = "/tmp" elif self.work_api == "jobs": kwargs["outdir"] = "$(task.outdir)" kwargs["docker_outdir"] = "$(task.outdir)" @@ -228,17 +244,7 @@ class ArvCwlRunner(object): raise WorkflowException("Workflow did not return a result.") if kwargs.get("compute_checksum"): - def compute_checksums(fileobj): - if "checksum" not in fileobj: - checksum = hashlib.sha1() - with self.fs_access.open(fileobj["location"], "rb") as f: - contents = f.read(1024*1024) - while contents != "": - checksum.update(contents) - contents = f.read(1024*1024) - fileobj["checksum"] = "sha1$%s" % checksum.hexdigest() - - adjustFileObjs(self.final_output, compute_checksums) + adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access)) return self.final_output