10576: Running jobs from keep: and arv: prefixes WIP. Tests passing, needs some...
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 5 Dec 2016 22:06:37 +0000 (17:06 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 5 Dec 2016 22:06:37 +0000 (17:06 -0500)
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/wf/expect_arvworkflow.cwl [new file with mode: 0644]

index c953b4e5bae34591a5c27754d0345575fb0647f5..f516a0b740a95720899b94659073fb33f3f1a7d0 100644 (file)
@@ -94,6 +94,9 @@ class ArvCwlRunner(object):
 
     def arv_make_tool(self, toolpath_object, **kwargs):
         kwargs["work_api"] = self.work_api
+        kwargs["fetcher_constructor"] = partial(CollectionFetcher,
+                                                api_client=self.api,
+                                                keep_client=self.keep_client)
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
index 08da4ca16efa1185aa40642817329b8b2f18d005..cea2b1091d400be70014cc512029db8d2b872955 100644 (file)
@@ -2,6 +2,8 @@ import logging
 import json
 import os
 
+import ruamel.yaml as yaml
+
 from cwltool.errors import WorkflowException
 from cwltool.process import get_feature, UnsupportedRequirement, shortname
 from cwltool.pathmapper import adjustFiles
@@ -12,6 +14,7 @@ import arvados.collection
 from .arvdocker import arv_docker_get_image
 from . import done
 from .runner import Runner, arvados_jobs_image
+from .fsaccess import CollectionFetcher
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -177,28 +180,9 @@ class RunnerContainer(Runner):
                 json.dump(self.job_order, f, sort_keys=True, indent=4)
             jobobj.save_new(owner_uuid=self.arvrunner.project_uuid)
 
-        workflowname = os.path.basename(self.tool.tool["id"])
-        workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
-        workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
-        workflowcollection = workflowcollection[5:workflowcollection.index('/')]
         jobpath = "/var/lib/cwl/job/cwl.input.json"
 
-        command = ["arvados-cwl-runner", "--local", "--api=containers"]
-        if self.output_name:
-            command.append("--output-name=" + self.output_name)
-
-        if self.output_tags:
-            command.append("--output-tags=" + self.output_tags)
-
-        if self.enable_reuse:
-            command.append("--enable-reuse")
-        else:
-            command.append("--disable-reuse")
-
-        command.extend([workflowpath, jobpath])
-
-        return {
-            "command": command,
+        container_req = {
             "owner_uuid": self.arvrunner.project_uuid,
             "name": self.name,
             "output_path": "/var/spool/cwl",
@@ -207,10 +191,6 @@ class RunnerContainer(Runner):
             "state": "Committed",
             "container_image": arvados_jobs_image(self.arvrunner),
             "mounts": {
-                "/var/lib/cwl/workflow": {
-                    "kind": "collection",
-                    "portable_data_hash": "%s" % workflowcollection
-                },
                 jobpath: {
                     "kind": "collection",
                     "portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash()
@@ -231,6 +211,45 @@ class RunnerContainer(Runner):
             }
         }
 
+        workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
+        if workflowcollection.startswith("keep:"):
+            workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+            workflowname = os.path.basename(self.tool.tool["id"])
+            workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
+            container_req["mounts"]["/var/lib/cwl/workflow"] = {
+                "kind": "collection",
+                "portable_data_hash": "%s" % workflowcollection
+                }
+        elif workflowcollection.startswith("arvwf:"):
+            workflowpath = "/var/lib/cwl/workflow.json#main"
+            fetcher = CollectionFetcher({}, None,
+                                        api_client=self.arvrunner.api,
+                                        keep_client=self.arvrunner.keep_client)
+            wfobj = yaml.safe_load(fetcher.fetch_text(workflowcollection))
+            container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+                "kind": "json",
+                "json": wfobj
+            }
+
+        command = ["arvados-cwl-runner", "--local", "--api=containers"]
+        if self.output_name:
+            command.append("--output-name=" + self.output_name)
+
+        if self.output_tags:
+            command.append("--output-tags=" + self.output_tags)
+
+        if self.enable_reuse:
+            command.append("--enable-reuse")
+        else:
+            command.append("--disable-reuse")
+
+        command.extend([workflowpath, jobpath])
+
+        container_req["command"] = command
+
+        return container_req
+
+
     def run(self, *args, **kwargs):
         kwargs["keepprefix"] = "keep:"
         job_spec = self.arvados_job_spec(*args, **kwargs)
index 9201ab60076add6142d90d1afd7196c88426e162..d1a64633ed729f0357898ff2fff2bee32b9d18a9 100644 (file)
@@ -4,6 +4,8 @@ import errno
 import urlparse
 import re
 
+import ruamel.yaml as yaml
+
 import cwltool.stdfsaccess
 from cwltool.pathmapper import abspath
 import cwltool.resolver
@@ -134,16 +136,16 @@ class CollectionFetcher(DefaultFetcher):
 
     def fetch_text(self, url):
         if url.startswith("keep:"):
-            with self.fsaccess.open(url) as f:
+            with self.fsaccess.open(url, "r") as f:
                 return f.read()
-        if url.startswith("arv:"):
-            return self.api_client.workflows().get(uuid=url[4:]).execute()["definition"]
+        if url.startswith("arvwf:"):
+            return self.api_client.workflows().get(uuid=url[6:]).execute()["definition"]
         return super(CollectionFetcher, self).fetch_text(url)
 
     def check_exists(self, url):
         if url.startswith("keep:"):
             return self.fsaccess.exists(url)
-        if url.startswith("arv:"):
+        if url.startswith("arvwf:"):
             if self.fetch_text(url):
                 return True
         return super(CollectionFetcher, self).check_exists(url)
@@ -157,7 +159,7 @@ class CollectionFetcher(DefaultFetcher):
             return url
 
         basesp = urlparse.urlsplit(base_url)
-        if basesp.scheme == "keep":
+        if basesp.scheme in ("keep", "arvwf"):
             if not basesp.path:
                 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
 
@@ -166,7 +168,7 @@ class CollectionFetcher(DefaultFetcher):
 
             pdh = baseparts.pop(0)
 
-            if not arvados.util.keep_locator_pattern.match(pdh):
+            if basesp.scheme == "keep" and not arvados.util.keep_locator_pattern.match(pdh):
                 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
 
             if urlsp.path.startswith("/"):
@@ -177,7 +179,7 @@ class CollectionFetcher(DefaultFetcher):
                 baseparts.pop()
 
             path = "/".join([pdh] + baseparts + urlparts)
-            return urlparse.urlunsplit(("keep", "", path, "", urlsp.fragment))
+            return urlparse.urlunsplit((basesp.scheme, "", path, "", urlsp.fragment))
 
         return super(CollectionFetcher, self).urljoin(base_url, url)
 
@@ -185,11 +187,11 @@ workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
 
 def collectionResolver(api_client, document_loader, uri):
     if workflow_uuid_pattern.match(uri):
-        return "arv:%s" % uri
+        return "arvwf:%s#main" % (uri)
 
     p = uri.split("/")
     if arvados.util.keep_locator_pattern.match(p[0]):
-        return "keep:" + uri
+        return "keep:%s" % (uri)
 
     if arvados.util.collection_uuid_pattern.match(p[0]):
         return "keep:%s%s" % (self.api_client.collections().
index 58500d3a993ddb74327c419925c2aed2b769a1b6..74d9481ff53003a458fa38e237b84342ad6fd427 100644 (file)
@@ -47,6 +47,8 @@ class ArvPathMapper(PathMapper):
                         pass
                     else:
                         raise WorkflowException("File literal '%s' is missing contents" % src)
+                elif src.startswith("arvwf:"):
+                    self._pathmap[src] = MapperEnt(src, src, "File")
                 else:
                     raise WorkflowException("Input file path '%s' is invalid" % st)
             if "secondaryFiles" in srcobj:
index d7858cf5e0039fe12fd413bff9812a0342a99cd8..d6497c44eb36f323f507a8f0f2db7fb94e2e1b58 100644 (file)
@@ -57,7 +57,7 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     loaded = set()
     def loadref(b, u):
-        joined = urlparse.urljoin(b, u)
+        joined = document_loader.fetcher.urljoin(b, u)
         defrg, _ = urlparse.urldefrag(joined)
         if defrg not in loaded:
             loaded.add(defrg)
@@ -85,7 +85,7 @@ def upload_dependencies(arvrunner, name, document_loader,
     sc = scandeps(uri, scanobj,
                   loadref_fields,
                   set(("$include", "$schemas", "location")),
-                  loadref)
+                  loadref, urljoin=document_loader.fetcher.urljoin)
 
     normalizeFilesDirs(sc)
 
index 70397fa157a331669c8a1fab4c72a9e73744ffe9..d489e69897ac84d90daf22777237a2a409b01555 100644 (file)
@@ -492,7 +492,7 @@ class TestSubmit(unittest.TestCase):
     @mock.patch("arvados.collection.CollectionReader")
     @mock.patch("time.sleep")
     @stubs
-    def test_submit_keepref(self, stubs, tm, collectionReader):
+    def test_submit_file_keepref(self, stubs, tm, collectionReader):
         capture_stdout = cStringIO.StringIO()
         exited = arvados_cwl.main(
             ["--submit", "--no-wait", "--api=containers", "--debug",
@@ -501,6 +501,143 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(exited, 0)
 
 
+    @mock.patch("arvados.collection.CollectionReader")
+    @mock.patch("time.sleep")
+    @stubs
+    def test_submit_keepref(self, stubs, tm, reader):
+        capture_stdout = cStringIO.StringIO()
+
+        with open("tests/wf/expect_arvworkflow.cwl") as f:
+            reader().open().__enter__().read.return_value = f.read()
+
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug",
+             "keep:99999999999999999999999999999994+99/expect_arvworkflow.cwl#main", "-x", "XxX"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        expect_container = {
+            'priority': 1,
+            'mounts': {
+                '/var/spool/cwl': {
+                    'writable': True,
+                    'kind': 'collection'
+                },
+                'stdout': {
+                    'path': '/var/spool/cwl/cwl.output.json',
+                    'kind': 'file'
+                },
+                '/var/lib/cwl/workflow': {
+                    'portable_data_hash': '99999999999999999999999999999994+99',
+                    'kind': 'collection'
+                },
+                '/var/lib/cwl/job/cwl.input.json': {
+                    'portable_data_hash': 'e5454f8ca7d5b181e21ecd45841e3373+58/cwl.input.json',
+                    'kind': 'collection'}
+            }, 'state': 'Committed',
+            'owner_uuid': None,
+            'output_path': '/var/spool/cwl',
+            'name': 'expect_arvworkflow.cwl#main',
+            'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+            'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/job/cwl.input.json'],
+            'cwd': '/var/spool/cwl',
+            'runtime_constraints': {
+                'API': True,
+                'vcpus': 1,
+                'ram': 1073741824
+            }
+        }
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=expect_container)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+
+    @mock.patch("time.sleep")
+    @stubs
+    def test_submit_arvworkflow(self, stubs, tm):
+        capture_stdout = cStringIO.StringIO()
+
+        with open("tests/wf/expect_arvworkflow.cwl") as f:
+            stubs.api.workflows().get().execute.return_value = {"definition": f.read()}
+
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug",
+             "962eh-7fd4e-gkbzl62qqtfig37", "-x", "XxX"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        expect_container = {
+            'priority': 1,
+            'mounts': {
+                '/var/spool/cwl': {
+                    'writable': True,
+                    'kind': 'collection'
+                },
+                'stdout': {
+                    'path': '/var/spool/cwl/cwl.output.json',
+                    'kind': 'file'
+                },
+                '/var/lib/cwl/workflow.json': {
+                    'kind': 'json',
+                    'json': {
+                        'cwlVersion': 'v1.0',
+                        '$graph': [
+                            {
+                                'inputs': [
+                                    {
+                                        'inputBinding': {'position': 1},
+                                        'type': 'string',
+                                        'id': '#submit_tool.cwl/x'}
+                                ],
+                                'requirements': [
+                                    {'dockerPull': 'debian:8', 'class': 'DockerRequirement'}
+                                ],
+                                'id': '#submit_tool.cwl',
+                                'outputs': [],
+                                'baseCommand': 'cat',
+                                'class': 'CommandLineTool'
+                            }, {
+                                'id': '#main',
+                                'inputs': [
+                                    {'type': 'string', 'id': '#main/x'}
+                                ],
+                                'steps': [
+                                    {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
+                                     'run': '#submit_tool.cwl',
+                                     'id': '#main/step1',
+                                     'out': []}
+                                ],
+                                'class': 'Workflow',
+                                'outputs': []
+                            }
+                        ]
+                    }
+                },
+                '/var/lib/cwl/job/cwl.input.json': {
+                    'portable_data_hash': 'e5454f8ca7d5b181e21ecd45841e3373+58/cwl.input.json',
+                    'kind': 'collection'}
+            }, 'state': 'Committed',
+            'owner_uuid': None,
+            'output_path': '/var/spool/cwl',
+            'name': 'arvwf:962eh-7fd4e-gkbzl62qqtfig37#main',
+            'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+            'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/job/cwl.input.json'],
+            'cwd': '/var/spool/cwl',
+            'runtime_constraints': {
+                'API': True,
+                'vcpus': 1,
+                'ram': 1073741824
+            }
+        }
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=expect_container)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+
     @mock.patch("arvados.commands.keepdocker.find_one_image_hash")
     @mock.patch("cwltool.docker.get_image")
     @mock.patch("arvados.api")
diff --git a/sdk/cwl/tests/wf/expect_arvworkflow.cwl b/sdk/cwl/tests/wf/expect_arvworkflow.cwl
new file mode 100644 (file)
index 0000000..56ce0d5
--- /dev/null
@@ -0,0 +1,24 @@
+$graph:
+- baseCommand: cat
+  class: CommandLineTool
+  id: '#submit_tool.cwl'
+  inputs:
+  - id: '#submit_tool.cwl/x'
+    inputBinding: {position: 1}
+    type: string
+  outputs: []
+  requirements:
+  - {class: DockerRequirement, dockerPull: 'debian:8'}
+- class: Workflow
+  id: '#main'
+  inputs:
+  - id: '#main/x'
+    type: string
+  outputs: []
+  steps:
+  - id: '#main/step1'
+    in:
+    - {id: '#main/step1/x', source: '#main/x'}
+    out: []
+    run: '#submit_tool.cwl'
+cwlVersion: v1.0