9615: Add check_writable to check for "writable" field and raise UnsupportedRequirement.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 4bff093d6b23ddbe505e77d6cea2bf26eb47f8c9..abe916f17ae518a6b8a4259a814f2ba2cd9b64a2 100644 (file)
@@ -8,6 +8,8 @@ import logging
 import os
 import sys
 import threading
+import hashlib
+from functools import partial
 import pkg_resources  # part of setuptools
 
 from cwltool.errors import WorkflowException
@@ -24,6 +26,8 @@ from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess
 
 from cwltool.process import shortname, UnsupportedRequirement
+from cwltool.pathmapper import adjustFileObjs
+from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -101,9 +105,21 @@ class ArvCwlRunner(object):
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
+    def check_writable(self, obj):
+        if isinstance(obj, dict):
+            if obj.get("writable"):
+                raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
+            for v in obj.itervalues():
+                self.check_writable(v)
+        if isinstance(obj, list):
+            for v in obj:
+                self.check_writable(v)
+
     def arvExecutor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
+        tool.visit(self.check_writable)
+
         if kwargs.get("quiet"):
             logger.setLevel(logging.WARN)
             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
@@ -111,7 +127,8 @@ class ArvCwlRunner(object):
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
-        self.fs_access = CollectionFsAccess(kwargs["basedir"], api_client=self.api)
+        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
+        self.fs_access = make_fs_access(kwargs["basedir"])
 
         if kwargs.get("create_template"):
             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
@@ -122,12 +139,12 @@ class ArvCwlRunner(object):
         self.debug = kwargs.get("debug")
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
-        kwargs["fs_access"] = self.fs_access
+        kwargs["make_fs_access"] = make_fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
         kwargs["use_container"] = True
         kwargs["tmpdir_prefix"] = "tmp"
         kwargs["on_error"] = "continue"
-        kwargs["compute_checksum"] = False
+        kwargs["compute_checksum"] = kwargs.get("compute_checksum")
 
         if self.work_api == "containers":
             kwargs["outdir"] = "/var/spool/cwl"
@@ -225,6 +242,9 @@ class ArvCwlRunner(object):
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
+        if kwargs.get("compute_checksum"):
+            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+
         return self.final_output
 
 
@@ -291,6 +311,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=None, dest="work_api",
                         help="Select work submission API, one of 'jobs' or 'containers'.")
 
+    parser.add_argument("--compute-checksum", action="store_true", default=False,
+                        help="Compute checksum of contents while collecting outputs",
+                        dest="compute_checksum")
+
     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
@@ -322,4 +346,5 @@ def main(args, stdout, stderr, api_client=None):
                              executor=runner.arvExecutor,
                              makeTool=runner.arvMakeTool,
                              versionfunc=versionstring,
-                             job_order_object=job_order_object)
+                             job_order_object=job_order_object,
+                             make_fs_access=partial(CollectionFsAccess, api_client=api_client))