Merge branch 'master' into 10194-cwl-version-skew
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 7d5590b952339359d8d6ab540f2fb3fa1eaa9e1a..e11f6a12a679f0d5cf2b6bf6e6b1726d512637a8 100644 (file)
@@ -24,14 +24,16 @@ import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from. runner import Runner, upload_instance
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
 from .perf import Perf
 from .pathmapper import FinalOutputPathMapper
+from ._version import __version__
 
 from cwltool.pack import pack
-from cwltool.process import shortname, UnsupportedRequirement
+from cwltool.process import shortname, UnsupportedRequirement, getListing
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
@@ -47,7 +49,7 @@ class ArvCwlRunner(object):
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None):
+    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
         self.api = api_client
         self.processes = {}
         self.lock = threading.Lock()
@@ -57,22 +59,32 @@ class ArvCwlRunner(object):
         self.uploaded = {}
         self.num_retries = 4
         self.uuid = None
-        self.work_api = work_api
         self.stop_polling = threading.Event()
         self.poll_api = None
         self.pipeline = None
         self.final_output_collection = None
+        self.output_name = output_name
+        self.project_uuid = None
+
         if keep_client is not None:
             self.keep_client = keep_client
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
-        if self.work_api is None:
-            # todo: autodetect API to use.
-            self.work_api = "jobs"
-
-        if self.work_api not in ("containers", "jobs"):
-            raise Exception("Unsupported API '%s'" % self.work_api)
+        for api in ["jobs", "containers"]:
+            try:
+                methods = self.api._rootDesc.get('resources')[api]['methods']
+                if ('httpMethod' in methods['create'] and
+                    (work_api == api or work_api is None)):
+                    self.work_api = api
+                    break
+            except KeyError:
+                pass
+        if not self.work_api:
+            if work_api is None:
+                raise Exception("No supported APIs")
+            else:
+                raise Exception("Unsupported API '%s'" % work_api)
 
     def arv_make_tool(self, toolpath_object, **kwargs):
         kwargs["work_api"] = self.work_api
@@ -217,7 +229,9 @@ class ArvCwlRunner(object):
 
         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
 
-        logger.info("Final output collection %s (%s)", final.portable_data_hash(), final.manifest_locator())
+        logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
+                    final.api_response()["name"],
+                    final.manifest_locator())
 
         self.final_output_collection = final
 
@@ -262,6 +276,8 @@ class ArvCwlRunner(object):
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
+        upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
+
         runnerjob = None
         if kwargs.get("submit"):
             if self.work_api == "containers":
@@ -270,9 +286,9 @@ class ArvCwlRunner(object):
                                          self.output_callback,
                                          **kwargs).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
             else:
-                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
 
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
             # Create pipeline for local run
@@ -285,7 +301,7 @@ class ArvCwlRunner(object):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
-            runnerjob.run()
+            runnerjob.run(wait=kwargs.get("wait"))
             return runnerjob.uuid
 
         self.poll_api = arvados.api('v1')
@@ -353,13 +369,15 @@ class ArvCwlRunner(object):
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
-        if kwargs.get("submit"):
+        if kwargs.get("submit") and isinstance(runnerjob, Runner):
             logger.info("Final output collection %s", runnerjob.final_output)
         else:
-            self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])),
-                                        self.final_output)
+            if self.output_name is None:
+                self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
+            self.make_output_collection(self.output_name, self.final_output)
 
         if kwargs.get("compute_checksum"):
+            adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
         return self.final_output
@@ -372,7 +390,7 @@ def versionstring():
     arvpkg = pkg_resources.require("arvados-python-client")
     cwlpkg = pkg_resources.require("cwltool")
 
-    return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+    return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
                                     "arvados-python-client", arvpkg[0].version,
                                     "cwltool", cwlpkg[0].version)
 
@@ -409,6 +427,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="")
 
     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
+    parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
@@ -466,7 +485,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
-        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client)
+        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
     except Exception as e:
         logger.error(e)
         return 1