logger.info("Pipeline instance %s", self.pipeline["uuid"])
if runnerjob and not kwargs.get("wait"):
- runnerjob.run()
+ runnerjob.run(wait=kwargs.get("wait"))
return runnerjob.uuid
self.poll_api = arvados.api('v1')
import re
import copy
import json
+import time
from cwltool.process import get_feature, shortname
from cwltool.errors import WorkflowException
workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ # Need to filter this out, gets added by cwltool when providing
+ # parameters on the command line, and arv-run-pipeline-instance doesn't
+ # like it.
+ if "job_order" in self.job_order:
+ del self.job_order["job_order"]
+
self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
if self.output_name:
self.job_order["arv:output_name"] = self.output_name
def run(self, *args, **kwargs):
job_spec = self.arvados_job_spec(*args, **kwargs)
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-
- response = self.arvrunner.api.jobs().create(
- body=job_spec,
- find_or_create=self.enable_reuse
- ).execute(num_retries=self.arvrunner.num_retries)
- self.uuid = response["uuid"]
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
+ body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": shortname(self.tool.tool["id"]),
+ "components": {"cwl-runner": job_spec },
+ "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
+ logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
+
+ if kwargs.get("wait") is False:
+ self.uuid = self.arvrunner.pipeline["uuid"]
+ return
+
+ job = None
+ while not job:
+ time.sleep(2)
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().get(
+ uuid=self.arvrunner.pipeline["uuid"]).execute(
+ num_retries=self.arvrunner.num_retries)
+ job = self.arvrunner.pipeline["components"]["cwl-runner"].get("job")
+
+ self.uuid = job["uuid"]
self.arvrunner.processes[self.uuid] = self
- logger.info("Submitted job %s", response["uuid"])
-
- if kwargs.get("submit"):
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": shortname(self.tool.tool["id"]),
- "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
- "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
+ if job["state"] in ("Complete", "Failed", "Cancelled"):
+ self.done(job)
class RunnerTemplate(object):
'script_version': 'master',
'script': 'cwl-runner'
}
+ stubs.pipeline_component = stubs.expect_job_spec.copy()
+ stubs.expect_pipeline_instance = {
+ 'name': 'submit_wf.cwl',
+ 'state': 'RunningOnServer',
+ "components": {
+ "cwl-runner": {
+ 'runtime_constraints': {'docker_image': 'arvados/jobs'},
+ 'script_parameters': {
+ 'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'},
+ 'x': {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'},
+ 'z': {'basename': 'anonymous', 'class': 'Directory',
+ 'listing': [
+ {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
+ ]},
+ 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+ },
+ 'repository': 'arvados',
+ 'script_version': 'master',
+ 'script': 'cwl-runner'
+ }
+ }
+ }
+ stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
+ stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+ stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
+ stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
+ stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
+ "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+ "state": "Queued"
+ }
+ stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
+ stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
stubs.expect_container_spec = {
'priority': 1,
class TestSubmit(unittest.TestCase):
+ @mock.patch("time.sleep")
@stubs
- def test_submit(self, stubs):
+ def test_submit(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait",
+ ["--submit", "--no-wait", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
}, ensure_unique_name=True),
mock.call().execute()])
- expect_job = copy.deepcopy(stubs.expect_job_spec)
- expect_job["owner_uuid"] = stubs.fake_user_uuid
- stubs.api.jobs().create.assert_called_with(
- body=expect_job,
- find_or_create=True)
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=expect_pipeline)
self.assertEqual(capture_stdout.getvalue(),
- stubs.expect_job_uuid + '\n')
+ stubs.expect_pipeline_uuid + '\n')
+ @mock.patch("time.sleep")
@stubs
- def test_submit_with_project_uuid(self, stubs):
+ def test_submit_with_project_uuid(self, stubs, tm):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
exited = arvados_cwl.main(
sys.stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- expect_body = copy.deepcopy(stubs.expect_job_spec)
- expect_body["owner_uuid"] = project_uuid
- stubs.api.jobs().create.assert_called_with(
- body=expect_body,
- find_or_create=True)
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["owner_uuid"] = project_uuid
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=expect_pipeline)
@stubs
def test_submit_container(self, stubs):
self._writers.add(writer)
@synchronized
- def remove_writer(self, writer):
+ def remove_writer(self, writer, flush):
"""
Called from ArvadosFileWriter.close(). Remove a writer reference from the list
and do some block maintenance tasks.
"""
self._writers.remove(writer)
- if self.size() > config.KEEP_BLOCK_SIZE / 2:
+ if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
# File writer closed, not small enough for repacking
self.flush()
elif self.closed():
def flush(self):
self.arvadosfile.flush()
- def close(self):
+ def close(self, flush=True):
if not self.closed:
- self.arvadosfile.remove_writer(self)
+ self.arvadosfile.remove_writer(self, flush)
super(ArvadosFileWriter, self).close()
def test_only_small_blocks_are_packed_together(self):
c = Collection()
# Write a couple of small files,
- with c.open("count.txt", "w") as f:
- f.write("0123456789")
- with c.open("foo.txt", "w") as foo:
- foo.write("foo")
+ f = c.open("count.txt", "w")
+ f.write("0123456789")
+ f.close(flush=False)
+ foo = c.open("foo.txt", "w")
+ foo.write("foo")
+ foo.close(flush=False)
# Then, write a big file, it shouldn't be packed with the ones above
- with c.open("bigfile.txt", "w") as big:
- big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
+ big = c.open("bigfile.txt", "w")
+ big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
+ big.close(flush=False)
self.assertEqual(
c.manifest_text("."),
'. 2d303c138c118af809f39319e5d507e9+34603008 a8430a058b8fbf408e1931b794dbd6fb+13 0:34603008:bigfile.txt 34603008:10:count.txt 34603018:3:foo.txt\n')
uuids: uuid_list)
end
+ def final?
+ [Complete, Cancelled].include?(self.state)
+ end
+
protected
def fill_field_defaults
def handle_completed
# This container is finished so finalize any associated container requests
# that are associated with this container.
- if self.state_changed? and [Complete, Cancelled].include? self.state
+ if self.state_changed? and self.final?
act_as_system_user do
if self.state == Cancelled
# Notify container requests associated with this container
ContainerRequest.where(container_uuid: uuid,
state: ContainerRequest::Committed).each do |cr|
- cr.container_completed!
+ cr.finalize!
end
# Try to cancel any outstanding container requests made by this container.
validate :validate_change
validate :validate_runtime_constraints
after_save :update_priority
+ after_save :finalize_if_needed
before_create :set_requesting_container_uuid
api_accessible :user, extend: :common do |t|
%w(modified_by_client_uuid container_uuid requesting_container_uuid)
end
+ def finalize_if_needed
+ if state == Committed && Container.find_by_uuid(container_uuid).final?
+ reload
+ act_as_system_user do
+ finalize!
+ end
+ end
+ end
+
# Finalize the container request after the container has
# finished/cancelled.
- def container_completed!
- update_attributes!(state: ContainerRequest::Final)
+ def finalize!
+ update_attributes!(state: Final)
c = Container.find_by_uuid(container_uuid)
['output', 'log'].each do |out_type|
pdh = c.send(out_type)
assert_equal prev_container_uuid, cr.container_uuid
end
+ test "Finalize committed request when reusing a finished container" do
+ set_user_from_auth :active
+ cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+ cr.reload
+ assert_equal ContainerRequest::Committed, cr.state
+ act_as_system_user do
+ c = Container.find_by_uuid(cr.container_uuid)
+ c.update_attributes!(state: Container::Locked)
+ c.update_attributes!(state: Container::Running)
+ c.update_attributes!(state: Container::Complete,
+ exit_code: 0,
+ output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
+ log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+ end
+ cr.reload
+ assert_equal ContainerRequest::Final, cr.state
+
+ cr2 = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+ assert_equal cr.container_uuid, cr2.container_uuid
+ assert_equal ContainerRequest::Final, cr2.state
+
+ cr3 = create_minimal_req!(priority: 1, state: ContainerRequest::Uncommitted)
+ assert_equal ContainerRequest::Uncommitted, cr3.state
+ cr3.update_attributes!(state: ContainerRequest::Committed)
+ assert_equal cr.container_uuid, cr3.container_uuid
+ assert_equal ContainerRequest::Final, cr3.state
+ end
end
self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024)
self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024)
+ self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
+
self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
def __enter__(self):
llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
- if self.listen_for_events:
+ if self.listen_for_events and not self.args.disable_event_listening:
self.operations.listen_for_events()
self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
self.llfuse_thread.daemon = True
self.daemon_ctx.open()
# Subscribe to change events from API server
- if self.listen_for_events:
+ if self.listen_for_events and not self.args.disable_event_listening:
self.operations.listen_for_events()
self._llfuse_main()
run_test_server.fixture('users')['active']['uuid'])
self.assertEqual(True, self.mnt.listen_for_events)
+ @noexit
+ @mock.patch('arvados.events.subscribe')
+ def test_disable_event_listening(self, mock_subscribe):
+ args = arvados_fuse.command.ArgumentParser().parse_args([
+ '--disable-event-listening',
+ '--by-id',
+ '--foreground', self.mntdir])
+ self.mnt = arvados_fuse.command.Mount(args)
+ self.assertEqual(True, self.mnt.listen_for_events)
+ self.assertEqual(True, self.mnt.args.disable_event_listening)
+ with self.mnt:
+ pass
+ self.assertEqual(0, mock_subscribe.call_count)
+
@noexit
@mock.patch('arvados.events.subscribe')
def test_custom(self, mock_subscribe):
def setUp(self):
super(TokenExpiryTest, self).setUp(local_store=False)
+ @unittest.skip("bug #10008")
@mock.patch('arvados.keep.KeepClient.get')
def runTest(self, mocked_get):
self.api._rootDesc = {"blobSignatureTtl": 2}