Merge branch '10088-raw-files' into 10081-cwl-run-same-job
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 19 Sep 2016 15:56:21 +0000 (11:56 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 19 Sep 2016 15:56:21 +0000 (11:56 -0400)
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arv-cwl-schema.yml [new file with mode: 0644]
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/setup.py
sdk/cwl/tests/scatter2.cwl [new file with mode: 0644]
sdk/cwl/tests/test_job.py

index 7bfdba8b80e48d9cb24d7054933b428aa3729764..0c56c8665e8b7bffc8e0cd1cea609800c5057364 100644 (file)
@@ -15,6 +15,7 @@ import pkg_resources  # part of setuptools
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
+import schema_salad
 
 import arvados
 import arvados.config
@@ -22,9 +23,11 @@ import arvados.config
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
+from .arvworkflow import ArvadosWorkflow
 from .fsaccess import CollectionFsAccess
-from .arvworkflow import make_workflow
+from .arvworkflow import upload_workflow
 from .perf import Perf
+from cwltool.pack import pack
 
 from cwltool.process import shortname, UnsupportedRequirement
 from cwltool.pathmapper import adjustFileObjs
@@ -53,6 +56,7 @@ class ArvCwlRunner(object):
         self.work_api = work_api
         self.stop_polling = threading.Event()
         self.poll_api = None
+        self.pipeline = None
 
         if self.work_api is None:
             # todo: autodetect API to use.
@@ -61,10 +65,12 @@ class ArvCwlRunner(object):
         if self.work_api not in ("containers", "jobs"):
             raise Exception("Unsupported API '%s'" % self.work_api)
 
-    def arvMakeTool(self, toolpath_object, **kwargs):
+    def arv_make_tool(self, toolpath_object, **kwargs):
+        kwargs["work_api"] = self.work_api
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            kwargs["work_api"] = self.work_api
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
+        elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
+            return ArvadosWorkflow(self, toolpath_object, **kwargs)
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
@@ -155,18 +161,11 @@ class ArvCwlRunner(object):
             for v in obj:
                 self.check_writable(v)
 
-    def arvExecutor(self, tool, job_order, **kwargs):
+    def arv_executor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
         tool.visit(self.check_writable)
 
-        if kwargs.get("quiet"):
-            logger.setLevel(logging.WARN)
-            logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-
-        if self.debug:
-            logger.setLevel(logging.DEBUG)
-
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
@@ -180,7 +179,7 @@ class ArvCwlRunner(object):
             return tmpl.uuid
 
         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
-            return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+            return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
@@ -367,6 +366,16 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     return parser
 
+def add_arv_hints():
+    cache = {}
+    res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
+    cache["https://w3id.org/cwl/arv-cwl-schema.yml"] = res.read()
+    res.close()
+    _, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
+    _, extnames, _, _ = schema_salad.schema.load_schema("https://w3id.org/cwl/arv-cwl-schema.yml", cache=cache)
+    for n in extnames.names:
+        if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
+            cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
 
 def main(args, stdout, stderr, api_client=None):
     parser = arg_parser()
@@ -376,6 +385,8 @@ def main(args, stdout, stderr, api_client=None):
     if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
+    add_arv_hints()
+
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
@@ -384,14 +395,21 @@ def main(args, stdout, stderr, api_client=None):
         logger.error(e)
         return 1
 
+    if arvargs.debug:
+        logger.setLevel(logging.DEBUG)
+
+    if arvargs.quiet:
+        logger.setLevel(logging.WARN)
+        logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+
     arvargs.conformance_test = None
     arvargs.use_container = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
-                             executor=runner.arvExecutor,
-                             makeTool=runner.arvMakeTool,
+                             executor=runner.arv_executor,
+                             makeTool=runner.arv_make_tool,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))
diff --git a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
new file mode 100644 (file)
index 0000000..2e5044d
--- /dev/null
@@ -0,0 +1,14 @@
+$base: "http://arvados.org/cwl#"
+$graph:
+- name: RunInSingleContainer
+  type: record
+  doc: |
+    Indicates that a subworkflow should run in a single container
+    and not be scheduled as separate steps.
+  fields:
+    - name: class
+      type: string
+      doc: "Always 'arv:RunInSingleContainer'"
+      jsonldPredicate:
+        _id: "@type"
+        _type: "@vocab"
index e6d0cba65fa00b6c0a185861d3dde89db2c3e4e9..78bca6e0e1ea9ffc49d6b35287556e98284658f5 100644 (file)
@@ -1,6 +1,7 @@
 import logging
 import re
 import copy
+import json
 
 from cwltool.process import get_feature, shortname
 from cwltool.errors import WorkflowException
@@ -185,9 +186,11 @@ class ArvadosJob(object):
             except WorkflowException as e:
                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
+                outputs = None
             except Exception as e:
                 logger.exception("Got unknown exception while collecting job outputs:")
                 processStatus = "permanentFail"
+                outputs = None
 
             self.output_callback(outputs, processStatus)
         finally:
index 8e45890a8d6226d9387524e1dc330effe900e163..eed4711a23a2153593ca80415bd2ff71cdc7d641 100644 (file)
@@ -1,16 +1,21 @@
 import os
 import json
 import copy
+import logging
 
 from cwltool.pack import pack
 from cwltool.load_tool import fetch_document
 from cwltool.process import shortname
+from cwltool.workflow import Workflow
 
 import ruamel.yaml as yaml
 
 from .runner import upload_docker, upload_dependencies
+from .arvtool import ArvadosCommandTool
 
-def make_workflow(arvRunner, tool, job_order, project_uuid, update_uuid):
+logger = logging.getLogger('arvados.cwl-runner')
+
+def upload_workflow(arvRunner, tool, job_order, project_uuid, update_uuid):
     upload_docker(arvRunner, tool)
 
     document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
@@ -39,3 +44,46 @@ def make_workflow(arvRunner, tool, job_order, project_uuid, update_uuid):
         return arvRunner.api.workflows().update(uuid=update_uuid, body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
     else:
         return arvRunner.api.workflows().create(body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
+
+class ArvadosWorkflow(Workflow):
+    """Wrap cwltool Workflow to override selected methods."""
+
+    def __init__(self, arvrunner, toolpath_object, **kwargs):
+        super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
+        self.arvrunner = arvrunner
+        self.work_api = kwargs["work_api"]
+
+    def job(self, joborder, output_callback, **kwargs):
+        kwargs["work_api"] = self.work_api
+        req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
+        if req:
+            document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
+            workflowobj["requirements"] = self.requirements + workflowobj.get("requirements", [])
+            workflowobj["hints"] = self.hints + workflowobj.get("hints", [])
+            packed = pack(document_loader, workflowobj, uri, self.metadata)
+            wf_runner = {
+                "class": "CommandLineTool",
+                "baseCommand": "cwltool",
+                "inputs": self.tool["inputs"],
+                "outputs": self.tool["outputs"],
+                "stdout": "cwl.output.json",
+                "requirements": workflowobj["requirements"]+[
+                    {"class": "InlineJavascriptRequirement"},
+                    {
+                    "class": "InitialWorkDirRequirement",
+                    "listing": [{
+                            "entryname": "workflow.json",
+                            "entry": json.dumps(packed, sort_keys=True, indent=4).replace('$(', '\$(').replace('${', '\${')
+                        }, {
+                            "entryname": "cwl.input.json",
+                            "entry": "$(JSON.stringify(inputs))"
+                        }]
+                }],
+                "hints": workflowobj["hints"],
+                "arguments": ["--no-container", "--move-outputs", "workflow.json", "cwl.input.json"]
+            }
+            kwargs["loader"] = self.doc_loader
+            kwargs["avsc_names"] = self.doc_schema
+            return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder, output_callback, **kwargs)
+        else:
+            return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
index fef9deca96c842f40a1db66f5c25989160798d12..6541ec3d06835876ede00810661d87cf0d6efec9 100644 (file)
@@ -25,6 +25,7 @@ setup(name='arvados-cwl-runner',
       download_url="https://github.com/curoverse/arvados.git",
       license='Apache 2.0',
       packages=find_packages(),
+      package_data={'arvados_cwl': ['arv-cwl-schema.yml']},
       scripts=[
           'bin/cwl-runner',
           'bin/arvados-cwl-runner'
diff --git a/sdk/cwl/tests/scatter2.cwl b/sdk/cwl/tests/scatter2.cwl
new file mode 100644 (file)
index 0000000..2952983
--- /dev/null
@@ -0,0 +1,60 @@
+class: Workflow
+cwlVersion: v1.0
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+inputs:
+  sleeptime:
+    type: int[]
+    default: [44, 29, 14]
+outputs: []
+requirements:
+  SubworkflowFeatureRequirement: {}
+  ScatterFeatureRequirement: {}
+  InlineJavascriptRequirement: {}
+  StepInputExpressionRequirement: {}
+steps:
+  scatterstep:
+    in:
+      sleeptime: sleeptime
+    out: []
+    scatter: sleeptime
+    hints:
+      - class: arv:RunInSingleContainer
+    run:
+      class: Workflow
+      id: mysub
+      inputs:
+        sleeptime: int
+      outputs: []
+      steps:
+        sleep1:
+          in:
+            sleeptime: sleeptime
+          out: [out]
+          run:
+            class: CommandLineTool
+            inputs:
+              sleeptime:
+                type: int
+                inputBinding: {position: 1}
+            outputs:
+              out:
+                type: string
+                outputBinding:
+                  outputEval: "out"
+            baseCommand: sleep
+        sleep2:
+          in:
+            sleeptime:
+              source: sleeptime
+              valueFrom: $(self+1)
+            dep: sleep1/out
+          out: []
+          run:
+            class: CommandLineTool
+            inputs:
+              sleeptime:
+                type: int
+                inputBinding: {position: 1}
+            outputs: []
+            baseCommand: sleep
index 6189a2de351ff44b1d08d8937ca632eef390656a..e95589cf65a3a1e20b7067e454d873a673b6da68 100644 (file)
@@ -1,12 +1,13 @@
-import arvados_cwl
+
 import logging
 import mock
 import unittest
 import os
 import functools
-import cwltool.process
-from schema_salad.ref_resolver import Loader
+import json
 
+import arvados_cwl
+import cwltool.process
 from schema_salad.ref_resolver import Loader
 
 if not os.getenv('ARVADOS_DEBUG'):
@@ -193,3 +194,56 @@ class TestJob(unittest.TestCase):
             mock.call().execute(num_retries=0)])
 
         self.assertFalse(api.collections().create.called)
+
+
+class TestWorkflow(unittest.TestCase):
+    # The test passes no builder.resources
+    # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+    def test_run(self):
+        arvados_cwl.add_arv_hints()
+
+        runner = arvados_cwl.ArvCwlRunner(mock.MagicMock())
+        runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+        runner.ignore_docker_for_reuse = False
+        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+        tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
+        metadata["cwlVersion"] = tool["cwlVersion"]
+
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
+                                              basedir="", make_fs_access=make_fs_access, loader=document_loader,
+                                              makeTool=runner.arv_make_tool, metadata=metadata)
+        arvtool.formatgraph = None
+        it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
+        it.next().run()
+        it.next().run()
+
+        runner.api.jobs().create.assert_called_with(
+            body={
+                'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
+                'repository': 'arvados',
+                'script_version': 'master',
+                'script': 'crunchrunner',
+                'script_parameters': {
+                    'tasks': [{'task.env': {
+                        'HOME': '$(task.outdir)',
+                        'TMPDIR': '$(task.tmpdir)'},
+                               'task.vwd': {
+                                   'workflow.json': '$(task.keep)/f101400a398097d4398cdb3eb5d1a7ca+118/workflow.json',
+                                   'cwl.input.json': '$(task.keep)/f101400a398097d4398cdb3eb5d1a7ca+118/cwl.input.json'
+                               },
+                    'command': [u'cwltool', u'--no-container', u'--move-outputs', u'workflow.json', u'cwl.input.json'],
+                    'task.stdout': 'cwl.output.json'}]},
+                'runtime_constraints': {
+                    'min_scratch_mb_per_node': 2048,
+                    'min_cores_per_node': 1,
+                    'docker_image': 'arvados/jobs',
+                    'min_ram_mb_per_node': 1024
+                },
+                'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'},
+            filters=[['repository', '=', 'arvados'],
+                     ['script', '=', 'crunchrunner'],
+                     ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
+                     ['docker_image_locator', 'in docker', 'arvados/jobs']],
+            find_or_create=True)