From: Peter Amstutz Date: Thu, 13 Oct 2016 19:02:32 +0000 (-0400) Subject: Merge branch '10221-cwl-pathmapping' closes #10221 X-Git-Tag: 1.1.0~666 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/ef6f7202858cba65e06cc1a32d52ee2305687bc8?hp=807125b8423058d336165f069bf5d618f77845b7 Merge branch '10221-cwl-pathmapping' closes #10221 --- diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index c8ba5a4f50..7ebb13f1bb 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -293,7 +293,7 @@ class ArvCwlRunner(object): logger.info("Pipeline instance %s", self.pipeline["uuid"]) if runnerjob and not kwargs.get("wait"): - runnerjob.run() + runnerjob.run(wait=kwargs.get("wait")) return runnerjob.uuid self.poll_api = arvados.api('v1') diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 8b1a934683..5250410086 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -2,6 +2,7 @@ import logging import re import copy import json +import time from cwltool.process import get_feature, shortname from cwltool.errors import WorkflowException @@ -233,6 +234,12 @@ class RunnerJob(Runner): workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs) + # Need to filter this out, gets added by cwltool when providing + # parameters on the command line, and arv-run-pipeline-instance doesn't + # like it. + if "job_order" in self.job_order: + del self.job_order["job_order"] + self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:] if self.output_name: self.job_order["arv:output_name"] = self.output_name @@ -248,28 +255,32 @@ class RunnerJob(Runner): def run(self, *args, **kwargs): job_spec = self.arvados_job_spec(*args, **kwargs) - job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) - - response = self.arvrunner.api.jobs().create( - body=job_spec, - find_or_create=self.enable_reuse - ).execute(num_retries=self.arvrunner.num_retries) - self.uuid = response["uuid"] + self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create( + body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": shortname(self.tool.tool["id"]), + "components": {"cwl-runner": job_spec }, + "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries) + logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"]) + + if kwargs.get("wait") is False: + self.uuid = self.arvrunner.pipeline["uuid"] + return + + job = None + while not job: + time.sleep(2) + self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().get( + uuid=self.arvrunner.pipeline["uuid"]).execute( + num_retries=self.arvrunner.num_retries) + job = self.arvrunner.pipeline["components"]["cwl-runner"].get("job") + + self.uuid = job["uuid"] self.arvrunner.processes[self.uuid] = self - logger.info("Submitted job %s", response["uuid"]) - - if kwargs.get("submit"): - self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create( - body={ - "owner_uuid": self.arvrunner.project_uuid, - "name": shortname(self.tool.tool["id"]), - "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } }, - "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries) - - if response["state"] in ("Complete", "Failed", "Cancelled"): - self.done(response) + if job["state"] in ("Complete", "Failed", "Cancelled"): + self.done(job) class RunnerTemplate(object): diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py index 007f809d41..f65a82bca6 100644 --- a/sdk/cwl/tests/test_submit.py +++ b/sdk/cwl/tests/test_submit.py @@ -118,6 +118,38 @@ def stubs(func): 'script_version': 'master', 'script': 'cwl-runner' } + stubs.pipeline_component = stubs.expect_job_spec.copy() + stubs.expect_pipeline_instance = { + 'name': 'submit_wf.cwl', + 'state': 'RunningOnServer', + "components": { + "cwl-runner": { + 'runtime_constraints': {'docker_image': 'arvados/jobs'}, + 'script_parameters': { + 'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}, + 'x': {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'}, + 'z': {'basename': 'anonymous', 'class': 'Directory', + 'listing': [ + {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'} + ]}, + 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl' + }, + 'repository': 'arvados', + 'script_version': 'master', + 'script': 'cwl-runner' + } + } + } + stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance) + stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz" + stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid + stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create) + stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = { + "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz", + "state": "Queued" + } + stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create + stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job stubs.expect_container_spec = { 'priority': 1, @@ -163,11 +195,12 @@ def stubs(func): class TestSubmit(unittest.TestCase): + @mock.patch("time.sleep") @stubs - def test_submit(self, stubs): + def test_submit(self, stubs, tm): capture_stdout = cStringIO.StringIO() exited = arvados_cwl.main( - ["--submit", "--no-wait", + ["--submit", "--no-wait", "--debug", "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"], capture_stdout, sys.stderr, api_client=stubs.api) self.assertEqual(exited, 0) @@ -198,16 +231,16 @@ class TestSubmit(unittest.TestCase): }, ensure_unique_name=True), mock.call().execute()]) - expect_job = copy.deepcopy(stubs.expect_job_spec) - expect_job["owner_uuid"] = stubs.fake_user_uuid - stubs.api.jobs().create.assert_called_with( - body=expect_job, - find_or_create=True) + expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance) + expect_pipeline["owner_uuid"] = stubs.fake_user_uuid + stubs.api.pipeline_instances().create.assert_called_with( + body=expect_pipeline) self.assertEqual(capture_stdout.getvalue(), - stubs.expect_job_uuid + '\n') + stubs.expect_pipeline_uuid + '\n') + @mock.patch("time.sleep") @stubs - def test_submit_with_project_uuid(self, stubs): + def test_submit_with_project_uuid(self, stubs, tm): project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz' exited = arvados_cwl.main( @@ -217,11 +250,10 @@ class TestSubmit(unittest.TestCase): sys.stdout, sys.stderr, api_client=stubs.api) self.assertEqual(exited, 0) - expect_body = copy.deepcopy(stubs.expect_job_spec) - expect_body["owner_uuid"] = project_uuid - stubs.api.jobs().create.assert_called_with( - body=expect_body, - find_or_create=True) + expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance) + expect_pipeline["owner_uuid"] = project_uuid + stubs.api.pipeline_instances().create.assert_called_with( + body=expect_pipeline) @stubs def test_submit_container(self, stubs): diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 1ca7ad82ed..c394dab810 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -834,14 +834,14 @@ class ArvadosFile(object): self._writers.add(writer) @synchronized - def remove_writer(self, writer): + def remove_writer(self, writer, flush): """ Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks. """ self._writers.remove(writer) - if self.size() > config.KEEP_BLOCK_SIZE / 2: + if flush or self.size() > config.KEEP_BLOCK_SIZE / 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): @@ -1166,7 +1166,7 @@ class ArvadosFileWriter(ArvadosFileReader): def flush(self): self.arvadosfile.flush() - def close(self): + def close(self, flush=True): if not self.closed: - self.arvadosfile.remove_writer(self) + self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close() diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py index 8d3e9d62a1..fc30a242eb 100644 --- a/sdk/python/tests/test_collections.py +++ b/sdk/python/tests/test_collections.py @@ -1110,13 +1110,16 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers): def test_only_small_blocks_are_packed_together(self): c = Collection() # Write a couple of small files, - with c.open("count.txt", "w") as f: - f.write("0123456789") - with c.open("foo.txt", "w") as foo: - foo.write("foo") + f = c.open("count.txt", "w") + f.write("0123456789") + f.close(flush=False) + foo = c.open("foo.txt", "w") + foo.write("foo") + foo.close(flush=False) # Then, write a big file, it shouldn't be packed with the ones above - with c.open("bigfile.txt", "w") as big: - big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2 + big = c.open("bigfile.txt", "w") + big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2 + big.close(flush=False) self.assertEqual( c.manifest_text("."), '. 2d303c138c118af809f39319e5d507e9+34603008 a8430a058b8fbf408e1931b794dbd6fb+13 0:34603008:bigfile.txt 34603008:10:count.txt 34603018:3:foo.txt\n') diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb index 43c5b30a1f..3a16e30e9e 100644 --- a/services/api/app/models/container.rb +++ b/services/api/app/models/container.rb @@ -166,6 +166,10 @@ class Container < ArvadosModel uuids: uuid_list) end + def final? + [Complete, Cancelled].include?(self.state) + end + protected def fill_field_defaults @@ -305,7 +309,7 @@ class Container < ArvadosModel def handle_completed # This container is finished so finalize any associated container requests # that are associated with this container. - if self.state_changed? and [Complete, Cancelled].include? self.state + if self.state_changed? and self.final? act_as_system_user do if self.state == Cancelled @@ -337,7 +341,7 @@ class Container < ArvadosModel # Notify container requests associated with this container ContainerRequest.where(container_uuid: uuid, state: ContainerRequest::Committed).each do |cr| - cr.container_completed! + cr.finalize! end # Try to cancel any outstanding container requests made by this container. diff --git a/services/api/app/models/container_request.rb b/services/api/app/models/container_request.rb index a588c86451..696b873bde 100644 --- a/services/api/app/models/container_request.rb +++ b/services/api/app/models/container_request.rb @@ -19,6 +19,7 @@ class ContainerRequest < ArvadosModel validate :validate_change validate :validate_runtime_constraints after_save :update_priority + after_save :finalize_if_needed before_create :set_requesting_container_uuid api_accessible :user, extend: :common do |t| @@ -65,10 +66,19 @@ class ContainerRequest < ArvadosModel %w(modified_by_client_uuid container_uuid requesting_container_uuid) end + def finalize_if_needed + if state == Committed && Container.find_by_uuid(container_uuid).final? + reload + act_as_system_user do + finalize! + end + end + end + # Finalize the container request after the container has # finished/cancelled. - def container_completed! - update_attributes!(state: ContainerRequest::Final) + def finalize! + update_attributes!(state: Final) c = Container.find_by_uuid(container_uuid) ['output', 'log'].each do |out_type| pdh = c.send(out_type) diff --git a/services/api/test/unit/container_request_test.rb b/services/api/test/unit/container_request_test.rb index 3b17574237..1c5c7ae5ce 100644 --- a/services/api/test/unit/container_request_test.rb +++ b/services/api/test/unit/container_request_test.rb @@ -481,4 +481,31 @@ class ContainerRequestTest < ActiveSupport::TestCase assert_equal prev_container_uuid, cr.container_uuid end + test "Finalize committed request when reusing a finished container" do + set_user_from_auth :active + cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed) + cr.reload + assert_equal ContainerRequest::Committed, cr.state + act_as_system_user do + c = Container.find_by_uuid(cr.container_uuid) + c.update_attributes!(state: Container::Locked) + c.update_attributes!(state: Container::Running) + c.update_attributes!(state: Container::Complete, + exit_code: 0, + output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45', + log: 'fa7aeb5140e2848d39b416daeef4ffc5+45') + end + cr.reload + assert_equal ContainerRequest::Final, cr.state + + cr2 = create_minimal_req!(priority: 1, state: ContainerRequest::Committed) + assert_equal cr.container_uuid, cr2.container_uuid + assert_equal ContainerRequest::Final, cr2.state + + cr3 = create_minimal_req!(priority: 1, state: ContainerRequest::Uncommitted) + assert_equal ContainerRequest::Uncommitted, cr3.state + cr3.update_attributes!(state: ContainerRequest::Committed) + assert_equal cr.container_uuid, cr3.container_uuid + assert_equal ContainerRequest::Final, cr3.state + end end diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py index d15f01792a..3f89732bea 100644 --- a/services/fuse/arvados_fuse/command.py +++ b/services/fuse/arvados_fuse/command.py @@ -77,6 +77,8 @@ class ArgumentParser(argparse.ArgumentParser): self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024) self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024) + self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False) + self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False) self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False) @@ -111,7 +113,7 @@ class Mount(object): def __enter__(self): llfuse.init(self.operations, self.args.mountpoint, self._fuse_options()) - if self.listen_for_events: + if self.listen_for_events and not self.args.disable_event_listening: self.operations.listen_for_events() self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main()) self.llfuse_thread.daemon = True @@ -330,7 +332,7 @@ From here, the following directories are available: self.daemon_ctx.open() # Subscribe to change events from API server - if self.listen_for_events: + if self.listen_for_events and not self.args.disable_event_listening: self.operations.listen_for_events() self._llfuse_main() diff --git a/services/fuse/tests/test_command_args.py b/services/fuse/tests/test_command_args.py index bb80d0a2fc..e8488d7ff9 100644 --- a/services/fuse/tests/test_command_args.py +++ b/services/fuse/tests/test_command_args.py @@ -170,6 +170,20 @@ class MountArgsTest(unittest.TestCase): run_test_server.fixture('users')['active']['uuid']) self.assertEqual(True, self.mnt.listen_for_events) + @noexit + @mock.patch('arvados.events.subscribe') + def test_disable_event_listening(self, mock_subscribe): + args = arvados_fuse.command.ArgumentParser().parse_args([ + '--disable-event-listening', + '--by-id', + '--foreground', self.mntdir]) + self.mnt = arvados_fuse.command.Mount(args) + self.assertEqual(True, self.mnt.listen_for_events) + self.assertEqual(True, self.mnt.args.disable_event_listening) + with self.mnt: + pass + self.assertEqual(0, mock_subscribe.call_count) + @noexit @mock.patch('arvados.events.subscribe') def test_custom(self, mock_subscribe): diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py index 8b6d01969a..8e4510355d 100644 --- a/services/fuse/tests/test_mount.py +++ b/services/fuse/tests/test_mount.py @@ -1163,6 +1163,7 @@ class TokenExpiryTest(MountTestBase): def setUp(self): super(TokenExpiryTest, self).setUp(local_store=False) + @unittest.skip("bug #10008") @mock.patch('arvados.keep.KeepClient.get') def runTest(self, mocked_get): self.api._rootDesc = {"blobSignatureTtl": 2}