10576: Add resolver to execute from keep references and arvados workflow
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 5 Dec 2016 15:40:42 +0000 (10:40 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 5 Dec 2016 15:40:42 +0000 (10:40 -0500)
records.  Add test for keep references in workflow default input.

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/test_urljoin.py
sdk/cwl/tests/wf/expect_packed.cwl
sdk/cwl/tests/wf/submit_keepref_wf.cwl [new file with mode: 0644]

index c7c186390bc18d9cbe24b58a03079940352d63e3..c953b4e5bae34591a5c27754d0345575fb0647f5 100644 (file)
@@ -29,7 +29,7 @@ from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from. runner import Runner, upload_instance
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
 from .perf import Perf
 from .pathmapper import FinalOutputPathMapper
 from ._version import __version__
@@ -623,4 +623,5 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
                                                     keep_client=keep_client),
                              fetcher_constructor=partial(CollectionFetcher,
                                                          api_client=api_client,
-                                                         keep_client=keep_client))
+                                                         keep_client=keep_client),
+                             resolver=partial(collectionResolver, api_client))
index 9cb73d3582dc29ffe0b585bc97fca873ebcbdfa0..9201ab60076add6142d90d1afd7196c88426e162 100644 (file)
@@ -2,9 +2,11 @@ import fnmatch
 import os
 import errno
 import urlparse
+import re
 
 import cwltool.stdfsaccess
 from cwltool.pathmapper import abspath
+import cwltool.resolver
 
 import arvados.util
 import arvados.collection
@@ -127,17 +129,23 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
 class CollectionFetcher(DefaultFetcher):
     def __init__(self, cache, session, api_client=None, keep_client=None):
         super(CollectionFetcher, self).__init__(cache, session)
+        self.api_client = api_client
         self.fsaccess = CollectionFsAccess("", api_client=api_client, keep_client=keep_client)
 
     def fetch_text(self, url):
         if url.startswith("keep:"):
             with self.fsaccess.open(url) as f:
                 return f.read()
+        if url.startswith("arv:"):
+            return self.api_client.workflows().get(uuid=url[4:]).execute()["definition"]
         return super(CollectionFetcher, self).fetch_text(url)
 
     def check_exists(self, url):
         if url.startswith("keep:"):
             return self.fsaccess.exists(url)
+        if url.startswith("arv:"):
+            if self.fetch_text(url):
+                return True
         return super(CollectionFetcher, self).check_exists(url)
 
     def urljoin(self, base_url, url):
@@ -172,3 +180,20 @@ class CollectionFetcher(DefaultFetcher):
             return urlparse.urlunsplit(("keep", "", path, "", urlsp.fragment))
 
         return super(CollectionFetcher, self).urljoin(base_url, url)
+
+workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
+
+def collectionResolver(api_client, document_loader, uri):
+    if workflow_uuid_pattern.match(uri):
+        return "arv:%s" % uri
+
+    p = uri.split("/")
+    if arvados.util.keep_locator_pattern.match(p[0]):
+        return "keep:" + uri
+
+    if arvados.util.collection_uuid_pattern.match(p[0]):
+        return "keep:%s%s" % (self.api_client.collections().
+                              get(uuid=uri).execute()["portable_data_hash"],
+                              uri[len(p[0]):])
+
+    return cwltool.resolver.tool_resolver(document_loader, uri)
index c8813adf7ea48d71f5b5b8cb9d7c091bfafa09c0..7dbc9c8ca101bddb96735053bda5f83049a82368 100644 (file)
@@ -164,6 +164,7 @@ class TestJob(unittest.TestCase):
         arvjob.builder = mock.MagicMock()
         arvjob.output_callback = mock.MagicMock()
         arvjob.collect_outputs = mock.MagicMock()
+        arvjob.collect_outputs.return_value = {"out": "stuff"}
 
         arvjob.done({
             "state": "Complete",
@@ -189,6 +190,8 @@ class TestJob(unittest.TestCase):
                   'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                   'name': 'Output 9999999 of testjob'})
 
+        arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
+
     @mock.patch("arvados.collection.CollectionReader")
     def test_done_use_existing_collection(self, reader):
         api = mock.MagicMock()
@@ -211,6 +214,7 @@ class TestJob(unittest.TestCase):
         arvjob.builder = mock.MagicMock()
         arvjob.output_callback = mock.MagicMock()
         arvjob.collect_outputs = mock.MagicMock()
+        arvjob.collect_outputs.return_value = {"out": "stuff"}
 
         arvjob.done({
             "state": "Complete",
@@ -228,6 +232,8 @@ class TestJob(unittest.TestCase):
 
         self.assertFalse(api.collections().create.called)
 
+        arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
+
 
 class TestWorkflow(unittest.TestCase):
     # The test passes no builder.resources
index dea04b3222ea13186f38c9c309ca0bf6fe9ba893..70397fa157a331669c8a1fab4c72a9e73744ffe9 100644 (file)
@@ -489,6 +489,18 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_container_request_uuid + '\n')
 
+    @mock.patch("arvados.collection.CollectionReader")
+    @mock.patch("time.sleep")
+    @stubs
+    def test_submit_keepref(self, stubs, tm, collectionReader):
+        capture_stdout = cStringIO.StringIO()
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug",
+             "tests/wf/submit_keepref_wf.cwl"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+
     @mock.patch("arvados.commands.keepdocker.find_one_image_hash")
     @mock.patch("cwltool.docker.get_image")
     @mock.patch("arvados.api")
index eaede53a1b8a8d2878c867d28d8d1dfb44e9b580..b9c8cea50353b26be6d8d3d93852120e0584792e 100644 (file)
@@ -51,3 +51,6 @@ class TestUrljoin(unittest.TestCase):
 
         self.assertEquals("keep:99999999999999999999999999999991+99/dir/wh.py",
                           cf.urljoin("keep:99999999999999999999999999999991+99/dir/", "wh.py"))
+
+    def test_resolver(self):
+        pass
index 25d02b2b809dcb7ad0272485eadb357b9e6f2eb6..1622f4841b41e62e2016d582a5e6ea515c23755c 100644 (file)
@@ -9,7 +9,7 @@ $graph:
     type: File
   outputs: []
   requirements:
-  - {class: DockerRequirement, dockerImageId: 'debian:8', dockerPull: 'debian:8'}
+  - {class: DockerRequirement, dockerPull: 'debian:8'}
 - class: Workflow
   id: '#main'
   inputs:
diff --git a/sdk/cwl/tests/wf/submit_keepref_wf.cwl b/sdk/cwl/tests/wf/submit_keepref_wf.cwl
new file mode 100644 (file)
index 0000000..f07714e
--- /dev/null
@@ -0,0 +1,20 @@
+# Test case for arvados-cwl-runner
+#
+# Used to test whether scanning a workflow file for dependencies
+# (e.g. submit_tool.cwl) and uploading to Keep works as intended.
+
+class: Workflow
+cwlVersion: v1.0
+inputs:
+  x:
+    type: File
+    default:
+      class: File
+      location: keep:99999999999999999999999999999994+99/blorp.txt
+outputs: []
+steps:
+  step1:
+    in:
+      x: x
+    out: []
+    run: ../tool/submit_tool.cwl