Merge branch '10221-cwl-pathmapping' closes #10221
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 13 Oct 2016 19:02:32 +0000 (15:02 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 13 Oct 2016 19:02:32 +0000 (15:02 -0400)
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/tests/test_submit.py
sdk/python/arvados/arvfile.py
sdk/python/tests/test_collections.py
services/api/app/models/container.rb
services/api/app/models/container_request.rb
services/api/test/unit/container_request_test.rb
services/fuse/arvados_fuse/command.py
services/fuse/tests/test_command_args.py
services/fuse/tests/test_mount.py

index c8ba5a4f50afbbd7658eb2494870433db89788ad..7ebb13f1bb48af456ce50f2c7d8629e03c975cb0 100644 (file)
@@ -293,7 +293,7 @@ class ArvCwlRunner(object):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
-            runnerjob.run()
+            runnerjob.run(wait=kwargs.get("wait"))
             return runnerjob.uuid
 
         self.poll_api = arvados.api('v1')
index 8b1a9346830b349e145fcb0fe5ec3f3cf1acf823..52504100864fbf41a8bad7ad144417b9c88999e1 100644 (file)
@@ -2,6 +2,7 @@ import logging
 import re
 import copy
 import json
+import time
 
 from cwltool.process import get_feature, shortname
 from cwltool.errors import WorkflowException
@@ -233,6 +234,12 @@ class RunnerJob(Runner):
 
         workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
 
+        # Need to filter this out, gets added by cwltool when providing
+        # parameters on the command line, and arv-run-pipeline-instance doesn't
+        # like it.
+        if "job_order" in self.job_order:
+            del self.job_order["job_order"]
+
         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
         if self.output_name:
             self.job_order["arv:output_name"] = self.output_name
@@ -248,28 +255,32 @@ class RunnerJob(Runner):
 
     def run(self, *args, **kwargs):
         job_spec = self.arvados_job_spec(*args, **kwargs)
-        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-
-        response = self.arvrunner.api.jobs().create(
-            body=job_spec,
-            find_or_create=self.enable_reuse
-        ).execute(num_retries=self.arvrunner.num_retries)
 
-        self.uuid = response["uuid"]
+        self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
+            body={
+                "owner_uuid": self.arvrunner.project_uuid,
+                "name": shortname(self.tool.tool["id"]),
+                "components": {"cwl-runner": job_spec },
+                "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
+        logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
+
+        if kwargs.get("wait") is False:
+            self.uuid = self.arvrunner.pipeline["uuid"]
+            return
+
+        job = None
+        while not job:
+            time.sleep(2)
+            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().get(
+                uuid=self.arvrunner.pipeline["uuid"]).execute(
+                    num_retries=self.arvrunner.num_retries)
+            job = self.arvrunner.pipeline["components"]["cwl-runner"].get("job")
+
+        self.uuid = job["uuid"]
         self.arvrunner.processes[self.uuid] = self
 
-        logger.info("Submitted job %s", response["uuid"])
-
-        if kwargs.get("submit"):
-            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
-                body={
-                    "owner_uuid": self.arvrunner.project_uuid,
-                    "name": shortname(self.tool.tool["id"]),
-                    "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
-                    "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
-        if response["state"] in ("Complete", "Failed", "Cancelled"):
-            self.done(response)
+        if job["state"] in ("Complete", "Failed", "Cancelled"):
+            self.done(job)
 
 
 class RunnerTemplate(object):
index 007f809d4140a89136b9419e7e6d9fd2d801477f..f65a82bca6e7e1fc68c5f6a5dbdd1699b7903625 100644 (file)
@@ -118,6 +118,38 @@ def stubs(func):
             'script_version': 'master',
             'script': 'cwl-runner'
         }
+        stubs.pipeline_component = stubs.expect_job_spec.copy()
+        stubs.expect_pipeline_instance = {
+            'name': 'submit_wf.cwl',
+            'state': 'RunningOnServer',
+            "components": {
+                "cwl-runner": {
+                    'runtime_constraints': {'docker_image': 'arvados/jobs'},
+                    'script_parameters': {
+                        'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'},
+                        'x': {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'},
+                        'z': {'basename': 'anonymous', 'class': 'Directory',
+                              'listing': [
+                                  {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
+                              ]},
+                        'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+                    },
+                    'repository': 'arvados',
+                    'script_version': 'master',
+                    'script': 'cwl-runner'
+                }
+            }
+        }
+        stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
+        stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+        stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
+        stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
+        stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
+            "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+            "state": "Queued"
+        }
+        stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
+        stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
 
         stubs.expect_container_spec = {
             'priority': 1,
@@ -163,11 +195,12 @@ def stubs(func):
 
 
 class TestSubmit(unittest.TestCase):
+    @mock.patch("time.sleep")
     @stubs
-    def test_submit(self, stubs):
+    def test_submit(self, stubs, tm):
         capture_stdout = cStringIO.StringIO()
         exited = arvados_cwl.main(
-            ["--submit", "--no-wait",
+            ["--submit", "--no-wait", "--debug",
              "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
             capture_stdout, sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
@@ -198,16 +231,16 @@ class TestSubmit(unittest.TestCase):
             }, ensure_unique_name=True),
             mock.call().execute()])
 
-        expect_job = copy.deepcopy(stubs.expect_job_spec)
-        expect_job["owner_uuid"] = stubs.fake_user_uuid
-        stubs.api.jobs().create.assert_called_with(
-            body=expect_job,
-            find_or_create=True)
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
         self.assertEqual(capture_stdout.getvalue(),
-                         stubs.expect_job_uuid + '\n')
+                         stubs.expect_pipeline_uuid + '\n')
 
+    @mock.patch("time.sleep")
     @stubs
-    def test_submit_with_project_uuid(self, stubs):
+    def test_submit_with_project_uuid(self, stubs, tm):
         project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
 
         exited = arvados_cwl.main(
@@ -217,11 +250,10 @@ class TestSubmit(unittest.TestCase):
             sys.stdout, sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
 
-        expect_body = copy.deepcopy(stubs.expect_job_spec)
-        expect_body["owner_uuid"] = project_uuid
-        stubs.api.jobs().create.assert_called_with(
-            body=expect_body,
-            find_or_create=True)
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        expect_pipeline["owner_uuid"] = project_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
 
     @stubs
     def test_submit_container(self, stubs):
index 1ca7ad82ed845844a2e18cc7bf4b7a9d24c3d652..c394dab810715c2659b6f72f8f5f1e173d711ead 100644 (file)
@@ -834,14 +834,14 @@ class ArvadosFile(object):
             self._writers.add(writer)
 
     @synchronized
-    def remove_writer(self, writer):
+    def remove_writer(self, writer, flush):
         """
         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
         and do some block maintenance tasks.
         """
         self._writers.remove(writer)
 
-        if self.size() > config.KEEP_BLOCK_SIZE / 2:
+        if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
             # File writer closed, not small enough for repacking
             self.flush()
         elif self.closed():
@@ -1166,7 +1166,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def flush(self):
         self.arvadosfile.flush()
 
-    def close(self):
+    def close(self, flush=True):
         if not self.closed:
-            self.arvadosfile.remove_writer(self)
+            self.arvadosfile.remove_writer(self, flush)
             super(ArvadosFileWriter, self).close()
index 8d3e9d62a1c32cf15c17fa3e264c58d19cd6f9ee..fc30a242eba1bfc665a05747de66f999869ef8a4 100644 (file)
@@ -1110,13 +1110,16 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
     def test_only_small_blocks_are_packed_together(self):
         c = Collection()
         # Write a couple of small files, 
-        with c.open("count.txt", "w") as f:
-            f.write("0123456789")
-        with c.open("foo.txt", "w") as foo:
-            foo.write("foo")
+        f = c.open("count.txt", "w")
+        f.write("0123456789")
+        f.close(flush=False)
+        foo = c.open("foo.txt", "w")
+        foo.write("foo")
+        foo.close(flush=False)
         # Then, write a big file, it shouldn't be packed with the ones above
-        with c.open("bigfile.txt", "w") as big:
-            big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
+        big = c.open("bigfile.txt", "w")
+        big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
+        big.close(flush=False)
         self.assertEqual(
             c.manifest_text("."),
             '. 2d303c138c118af809f39319e5d507e9+34603008 a8430a058b8fbf408e1931b794dbd6fb+13 0:34603008:bigfile.txt 34603008:10:count.txt 34603018:3:foo.txt\n')
index 43c5b30a1f603fd94b828b3449f71c01b5ba716a..3a16e30e9ec545840b3be592138a0b2aacf34694 100644 (file)
@@ -166,6 +166,10 @@ class Container < ArvadosModel
             uuids: uuid_list)
   end
 
+  def final?
+    [Complete, Cancelled].include?(self.state)
+  end
+
   protected
 
   def fill_field_defaults
@@ -305,7 +309,7 @@ class Container < ArvadosModel
   def handle_completed
     # This container is finished so finalize any associated container requests
     # that are associated with this container.
-    if self.state_changed? and [Complete, Cancelled].include? self.state
+    if self.state_changed? and self.final?
       act_as_system_user do
 
         if self.state == Cancelled
@@ -337,7 +341,7 @@ class Container < ArvadosModel
         # Notify container requests associated with this container
         ContainerRequest.where(container_uuid: uuid,
                                state: ContainerRequest::Committed).each do |cr|
-          cr.container_completed!
+          cr.finalize!
         end
 
         # Try to cancel any outstanding container requests made by this container.
index a588c86451a88c2205472ba2dcc19eaff3dd15d3..696b873bde383ade05993a7d7c33800b70dffe04 100644 (file)
@@ -19,6 +19,7 @@ class ContainerRequest < ArvadosModel
   validate :validate_change
   validate :validate_runtime_constraints
   after_save :update_priority
+  after_save :finalize_if_needed
   before_create :set_requesting_container_uuid
 
   api_accessible :user, extend: :common do |t|
@@ -65,10 +66,19 @@ class ContainerRequest < ArvadosModel
     %w(modified_by_client_uuid container_uuid requesting_container_uuid)
   end
 
+  def finalize_if_needed
+    if state == Committed && Container.find_by_uuid(container_uuid).final?
+      reload
+      act_as_system_user do
+        finalize!
+      end
+    end
+  end
+
   # Finalize the container request after the container has
   # finished/cancelled.
-  def container_completed!
-    update_attributes!(state: ContainerRequest::Final)
+  def finalize!
+    update_attributes!(state: Final)
     c = Container.find_by_uuid(container_uuid)
     ['output', 'log'].each do |out_type|
       pdh = c.send(out_type)
index 3b175742370b93bf983754d06e459115a7180d84..1c5c7ae5cea5a55da5c2af9500e42321cd5811f8 100644 (file)
@@ -481,4 +481,31 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal prev_container_uuid, cr.container_uuid
   end
 
+  test "Finalize committed request when reusing a finished container" do
+    set_user_from_auth :active
+    cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+    cr.reload
+    assert_equal ContainerRequest::Committed, cr.state
+    act_as_system_user do
+      c = Container.find_by_uuid(cr.container_uuid)
+      c.update_attributes!(state: Container::Locked)
+      c.update_attributes!(state: Container::Running)
+      c.update_attributes!(state: Container::Complete,
+                           exit_code: 0,
+                           output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
+                           log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+    end
+    cr.reload
+    assert_equal ContainerRequest::Final, cr.state
+
+    cr2 = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+    assert_equal cr.container_uuid, cr2.container_uuid
+    assert_equal ContainerRequest::Final, cr2.state
+
+    cr3 = create_minimal_req!(priority: 1, state: ContainerRequest::Uncommitted)
+    assert_equal ContainerRequest::Uncommitted, cr3.state
+    cr3.update_attributes!(state: ContainerRequest::Committed)
+    assert_equal cr.container_uuid, cr3.container_uuid
+    assert_equal ContainerRequest::Final, cr3.state
+  end
 end
index d15f01792a8e8dd4b433611ce20a752c1138c877..3f89732bea25dcd1ca546fbef126227e9e0a9256 100644 (file)
@@ -77,6 +77,8 @@ class ArgumentParser(argparse.ArgumentParser):
         self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024)
         self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024)
 
+        self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
+
         self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
         self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
 
@@ -111,7 +113,7 @@ class Mount(object):
 
     def __enter__(self):
         llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-        if self.listen_for_events:
+        if self.listen_for_events and not self.args.disable_event_listening:
             self.operations.listen_for_events()
         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
         self.llfuse_thread.daemon = True
@@ -330,7 +332,7 @@ From here, the following directories are available:
                 self.daemon_ctx.open()
 
             # Subscribe to change events from API server
-            if self.listen_for_events:
+            if self.listen_for_events and not self.args.disable_event_listening:
                 self.operations.listen_for_events()
 
             self._llfuse_main()
index bb80d0a2fc94dc4c77c0f46f59414a8d00627235..e8488d7ff967179423f3732c8e6e56b05194ed58 100644 (file)
@@ -170,6 +170,20 @@ class MountArgsTest(unittest.TestCase):
                          run_test_server.fixture('users')['active']['uuid'])
         self.assertEqual(True, self.mnt.listen_for_events)
 
+    @noexit
+    @mock.patch('arvados.events.subscribe')
+    def test_disable_event_listening(self, mock_subscribe):
+        args = arvados_fuse.command.ArgumentParser().parse_args([
+            '--disable-event-listening',
+            '--by-id',
+            '--foreground', self.mntdir])
+        self.mnt = arvados_fuse.command.Mount(args)
+        self.assertEqual(True, self.mnt.listen_for_events)
+        self.assertEqual(True, self.mnt.args.disable_event_listening)
+        with self.mnt:
+            pass
+        self.assertEqual(0, mock_subscribe.call_count)
+
     @noexit
     @mock.patch('arvados.events.subscribe')
     def test_custom(self, mock_subscribe):
index 8b6d01969a7819f52975c511f1d7a954aaaa324d..8e4510355d80d996a0f02a730945df20e56611e6 100644 (file)
@@ -1163,6 +1163,7 @@ class TokenExpiryTest(MountTestBase):
     def setUp(self):
         super(TokenExpiryTest, self).setUp(local_store=False)
 
+    @unittest.skip("bug #10008")
     @mock.patch('arvados.keep.KeepClient.get')
     def runTest(self, mocked_get):
         self.api._rootDesc = {"blobSignatureTtl": 2}