9751: Override realpath in CollectionFsAccess
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 69c933c15debff5959478d76aa8c560640d3d950..fd9e74f8575167f7aa003ef32d9c449596810408 100644 (file)
@@ -8,6 +8,8 @@ import logging
 import os
 import sys
 import threading
+import hashlib
+from functools import partial
 import pkg_resources  # part of setuptools
 
 from cwltool.errors import WorkflowException
@@ -16,6 +18,7 @@ import cwltool.workflow
 
 import arvados
 import arvados.events
+import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
@@ -23,6 +26,7 @@ from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess
 
 from cwltool.process import shortname, UnsupportedRequirement
+from cwltool.pathmapper import adjustFileObjs
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -34,7 +38,7 @@ class ArvCwlRunner(object):
 
     """
 
-    def __init__(self, api_client, work_api):
+    def __init__(self, api_client, work_api=None):
         self.api = api_client
         self.processes = {}
         self.lock = threading.Lock()
@@ -46,6 +50,10 @@ class ArvCwlRunner(object):
         self.uuid = None
         self.work_api = work_api
 
+        if self.work_api is None:
+            # todo: autodetect API to use.
+            self.work_api = "jobs"
+
         if self.work_api not in ("containers", "jobs"):
             raise Exception("Unsupported API '%s'" % self.work_api)
 
@@ -115,16 +123,23 @@ class ArvCwlRunner(object):
 
         self.debug = kwargs.get("debug")
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
-        self.fs_access = CollectionFsAccess(kwargs["basedir"])
 
-        kwargs["fs_access"] = self.fs_access
+        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
+        self.fs_access = make_fs_access(kwargs["basedir"])
+        kwargs["make_fs_access"] = make_fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
+        kwargs["use_container"] = True
+        kwargs["tmpdir_prefix"] = "tmp"
+        kwargs["on_error"] = "continue"
+        kwargs["compute_checksum"] = kwargs.get("compute_checksum")
 
         if self.work_api == "containers":
             kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
         elif self.work_api == "jobs":
             kwargs["outdir"] = "$(task.outdir)"
+            kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
         runnerjob = None
@@ -153,6 +168,8 @@ class ArvCwlRunner(object):
             runnerjob.run()
             return runnerjob.uuid
 
+        arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
+
         if self.work_api == "containers":
             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
         if self.work_api == "jobs":
@@ -165,7 +182,6 @@ class ArvCwlRunner(object):
                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
             jobiter = tool.job(job_order,
                                self.output_callback,
-                               docker_outdir="$(task.outdir)",
                                **kwargs)
 
         try:
@@ -207,8 +223,24 @@ class ArvCwlRunner(object):
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
 
+        if self.final_status != "success":
+            raise WorkflowException("Workflow failed.")
+
         if self.final_output is None:
-            raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+            raise WorkflowException("Workflow did not return a result.")
+
+        if kwargs.get("compute_checksum"):
+            def compute_checksums(fileobj):
+                if "checksum" not in fileobj:
+                    checksum = hashlib.sha1()
+                    with self.fs_access.open(fileobj["location"], "rb") as f:
+                        contents = f.read(1024*1024)
+                        while contents != "":
+                            checksum.update(contents)
+                            contents = f.read(1024*1024)
+                    fileobj["checksum"] = "sha1$%s" % checksum.hexdigest()
+
+            adjustFileObjs(self.final_output, compute_checksums)
 
         return self.final_output
 
@@ -276,6 +308,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=None, dest="work_api",
                         help="Select work submission API, one of 'jobs' or 'containers'.")
 
+    parser.add_argument("--compute-checksum", action="store_true", default=False,
+                        help="Compute checksum of contents while collecting outputs",
+                        dest="compute_checksum")
+
     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
@@ -290,9 +326,6 @@ def main(args, stdout, stderr, api_client=None):
     if arvargs.create_template and not arvargs.job_order:
         job_order_object = ({}, "")
 
-    if arvargs.work_api is None:
-        arvargs.work_api = "jobs"
-
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
@@ -302,6 +335,7 @@ def main(args, stdout, stderr, api_client=None):
         return 1
 
     arvargs.conformance_test = None
+    arvargs.use_container = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
@@ -309,4 +343,5 @@ def main(args, stdout, stderr, api_client=None):
                              executor=runner.arvExecutor,
                              makeTool=runner.arvMakeTool,
                              versionfunc=versionstring,
-                             job_order_object=job_order_object)
+                             job_order_object=job_order_object,
+                             make_fs_access=partial(CollectionFsAccess, api_client=api_client))