9783: Report useful error if subdirectory of a collection doesn't exist or isn't a
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 7bd9df344cb927713fcafb6d1ff98ad68adb1cc2..27af075f3652b4180e23906ed2fc914812036534 100644 (file)
@@ -27,6 +27,7 @@ from .fsaccess import CollectionFsAccess
 
 from cwltool.process import shortname, UnsupportedRequirement
 from cwltool.pathmapper import adjustFileObjs
+from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -104,9 +105,21 @@ class ArvCwlRunner(object):
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
+    def check_writable(self, obj):
+        if isinstance(obj, dict):
+            if obj.get("writable"):
+                raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
+            for v in obj.itervalues():
+                self.check_writable(v)
+        if isinstance(obj, list):
+            for v in obj:
+                self.check_writable(v)
+
     def arvExecutor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
+        tool.visit(self.check_writable)
+
         if kwargs.get("quiet"):
             logger.setLevel(logging.WARN)
             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
@@ -114,6 +127,8 @@ class ArvCwlRunner(object):
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
+        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
+        self.fs_access = make_fs_access(kwargs["basedir"])
 
         if kwargs.get("create_template"):
             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
@@ -124,7 +139,7 @@ class ArvCwlRunner(object):
         self.debug = kwargs.get("debug")
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
-        self.fs_access = kwargs["make_fs_access"](kwargs["basedir"])
+        kwargs["make_fs_access"] = make_fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
         kwargs["use_container"] = True
         kwargs["tmpdir_prefix"] = "tmp"
@@ -135,6 +150,7 @@ class ArvCwlRunner(object):
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
+            kwargs["docker_tmpdir"] = "/tmp"
         elif self.work_api == "jobs":
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["docker_outdir"] = "$(task.outdir)"
@@ -228,17 +244,7 @@ class ArvCwlRunner(object):
             raise WorkflowException("Workflow did not return a result.")
 
         if kwargs.get("compute_checksum"):
-            def compute_checksums(fileobj):
-                if "checksum" not in fileobj:
-                    checksum = hashlib.sha1()
-                    with self.fs_access.open(fileobj["location"], "rb") as f:
-                        contents = f.read(1024*1024)
-                        while contents != "":
-                            checksum.update(contents)
-                            contents = f.read(1024*1024)
-                    fileobj["checksum"] = "sha1$%s" % checksum.hexdigest()
-
-            adjustFileObjs(self.final_output, compute_checksums)
+            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
         return self.final_output