# Determine whether this version of Docker supports memory+swap limits.
($exited, $stdout, $stderr) = srun_sync(
- ["srun", "--nodelist=" . $node[0]],
+ ["srun", "--nodes=1"],
[$docker_bin, 'run', '--help'],
{label => "check --memory-swap feature"});
$docker_limitmem = ($stdout =~ /--memory-swap/);
$try_user_arg = "--user=$try_user";
}
my ($exited, $stdout, $stderr) = srun_sync(
- ["srun", "--nodelist=" . $node[0]],
+ ["srun", "--nodes=1"],
["/bin/sh", "-ec",
"$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
{label => $label});
keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-def arv_docker_get_image(api_client, dockerRequirement, pull_image):
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
if not images:
imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
- args = [image_name]
+ args = ["--project-uuid="+project_uuid, image_name]
if image_tag:
args.append(image_tag)
- logger.info("Uploading Docker image %s", ":".join(args))
+ logger.info("Uploading Docker image %s", ":".join(args[1:]))
arvados.commands.keepdocker.main(args)
return dockerRequirement["dockerImageId"]
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
+ runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
resources = self.builder.resources
if resources is not None:
try:
response = self.arvrunner.api.jobs().create(body={
+ "owner_uuid": self.arvrunner.project_uuid,
"script": "crunchrunner",
"repository": "arvados",
"script_version": "master",
arvrunner.api,
dry_run=kwargs.get("dry_run"),
num_retries=3,
- fnPattern="$(task.keep)/%s/%s")
+ fnPattern="$(task.keep)/%s/%s",
+ project=arvrunner.project_uuid)
for src, ab, st in uploadfiles:
arvrunner.add_uploaded(src, (ab, st.fn))
def on_message(self, event):
if "object_uuid" in event:
- if event["object_uuid"] in self.jobs and event["event_type"] == "update":
- if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
- uuid = event["object_uuid"]
- with self.lock:
- j = self.jobs[uuid]
- logger.info("Job %s (%s) is Running", j.name, uuid)
- j.running = True
- j.update_pipeline_component(event["properties"]["new_attributes"])
- elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
- uuid = event["object_uuid"]
- try:
- self.cond.acquire()
- j = self.jobs[uuid]
- logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
- j.done(event["properties"]["new_attributes"])
- self.cond.notify()
- finally:
- self.cond.release()
+ if event["object_uuid"] in self.jobs and event["event_type"] == "update":
+ if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
+ uuid = event["object_uuid"]
+ with self.lock:
+ j = self.jobs[uuid]
+ logger.info("Job %s (%s) is Running", j.name, uuid)
+ j.running = True
+ j.update_pipeline_component(event["properties"]["new_attributes"])
+ elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ uuid = event["object_uuid"]
+ try:
+ self.cond.acquire()
+ j = self.jobs[uuid]
+ logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+ j.done(event["properties"]["new_attributes"])
+ self.cond.notify()
+ finally:
+ self.cond.release()
def get_uploaded(self):
return self.uploaded.copy()
kwargs["outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
+ useruuid = self.api.users().current().execute()["uuid"]
+ self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+
if kwargs.get("conformance_test"):
return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
else:
- self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
- "components": {},
- "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+ self.pipeline = self.api.pipeline_instances().create(
+ body={
+ "owner_uuid": self.project_uuid,
+ "name": shortname(tool.tool["id"]),
+ "components": {},
+ "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+
+ logger.info("Pipeline instance %s", self.pipeline["uuid"])
jobiter = tool.job(job_order,
input_basedir,
**kwargs)
try:
+ self.cond.acquire()
+ # Will continue to hold the lock for the duration of this code
+ # except when in cond.wait(), at which point on_message can update
+ # job state and process output callbacks.
+
for runnable in jobiter:
if runnable:
- with self.lock:
- runnable.run(**kwargs)
+ runnable.run(**kwargs)
else:
if self.jobs:
- try:
- self.cond.acquire()
- self.cond.wait(1)
- except RuntimeError:
- pass
- finally:
- self.cond.release()
+ self.cond.wait(1)
else:
- logger.error("Workflow cannot make any more progress.")
+ logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
break
while self.jobs:
- try:
- self.cond.acquire()
- self.cond.wait(1)
- except RuntimeError:
- pass
- finally:
- self.cond.release()
+ self.cond.wait(1)
events.close()
if self.final_output is None:
raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+ # create final output collection
except:
- if sys.exc_info()[0] is not KeyboardInterrupt:
+ if sys.exc_info()[0] is KeyboardInterrupt:
+ logger.error("Interrupted, marking pipeline as failed")
+ else:
logger.exception("Caught unhandled exception, marking pipeline as failed")
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ finally:
+ self.cond.release()
return self.final_output
parser = cwltool.main.arg_parser()
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--enable-reuse", action="store_true",
- default=False, dest="enable_reuse",
+ default=True, dest="enable_reuse",
help="")
exgroup.add_argument("--disable-reuse", action="store_false",
- default=False, dest="enable_reuse",
+ default=True, dest="enable_reuse",
help="")
+ parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
try:
runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
'bin/arvados-cwl-runner'
],
install_requires=[
- 'cwltool>=1.0.20160308152645',
+ 'cwltool>=1.0.20160311170456',
'arvados-python-client>=0.1.20160219154918'
],
test_suite='tests',
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
def test_run(self):
runner = mock.MagicMock()
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
tool = {
"inputs": [],
"outputs": [],
for j in arvtool.job({}, "", mock.MagicMock()):
j.run()
runner.api.jobs().create.assert_called_with(body={
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'runtime_constraints': {},
'script_parameters': {
'tasks': [{
# For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
def test_resource_requirements(self):
runner = mock.MagicMock()
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
tool = {
"inputs": [],
"outputs": [],
for j in arvtool.job({}, "", mock.MagicMock()):
j.run()
runner.api.jobs().create.assert_called_with(body={
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'runtime_constraints': {},
'script_parameters': {
'tasks': [{
def find_objects_for_index
# Here we are deliberately less helpful about searching for client
# authorizations. We look up tokens belonging to the current user
- # and filter by exact matches on api_token and scopes.
+ # and filter by exact matches on uuid, api_token, and scopes.
wanted_scopes = []
if @filters
wanted_scopes.concat(@filters.map { |attr, operator, operand|
((attr == 'scopes') and (operator == '=')) ? operand : nil
})
@filters.select! { |attr, operator, operand|
- ((attr == 'uuid') and (operator == '=')) || ((attr == 'api_token') and (operator == '='))
+ operator == '=' && (attr == 'uuid' || attr == 'api_token')
}
end
if @where
wanted_scopes << @where['scopes']
- @where.select! { |attr, val| attr == 'uuid' }
+ @where.select! { |attr, val|
+ # "where":{"uuid":"zzzzz-zzzzz-zzzzzzzzzzzzzzz"} is OK but
+ # "where":{"api_client_id":1} is not supported
+ # "where":{"uuid":["contains","-"]} is not supported
+ # "where":{"uuid":["uuid1","uuid2","uuid3"]} is not supported
+ val.is_a?(String) && (attr == 'uuid' || attr == 'api_token')
+ }
end
@objects = model_class.
includes(:user, :api_client).
end
def find_object_by_uuid
- @object = model_class.where(uuid: (params[:uuid] || params[:id])).first
+ uuid_param = params[:uuid] || params[:id]
+ if (uuid_param != current_api_client_authorization.andand.uuid and
+ not Thread.current[:api_client].andand.is_trusted)
+ return forbidden
+ end
+ @limit = 1
+ @offset = 0
+ @orders = []
+ @where = {}
+ @filters = [['uuid', '=', uuid_param]]
+ find_objects_for_index
+ @object = @objects.first
end
def current_api_client_is_trusted
- unless Thread.current[:api_client].andand.is_trusted
- if params["action"] == "show"
- if @object and @object['api_token'] == current_api_client_authorization.andand.api_token
- return true
- end
- elsif params["action"] == "index" and @objects.andand.size == 1
- filters = @filters.map{|f|f.first}.uniq
- if ['uuid'] == filters
- return true if @objects.first['api_token'] == current_api_client_authorization.andand.api_token
- elsif ['api_token'] == filters
- return true if @objects.first[:user_id] = current_user.id
- end
- end
- send_error('Forbidden: this API client cannot manipulate other clients\' access tokens.',
- status: 403)
+ if Thread.current[:api_client].andand.is_trusted
+ return true
+ end
+ # A non-trusted client can do a search for its own token if it
+ # explicitly restricts the search to its own UUID or api_token.
+ # Any other kind of query must return 403, even if it matches only
+ # the current token, because that's currently how Workbench knows
+ # (after searching on scopes) the difference between "the token
+ # I'm using now *is* the only sharing token for this collection"
+ # (403) and "my token is trusted, and there is one sharing token
+ # for this collection" (200).
+ #
+ # The @filters test here also prevents a non-trusted token from
+ # filtering on its own scopes, and discovering whether any _other_
+ # equally scoped tokens exist (403=yes, 200=no).
+ if (@objects.andand.count == 1 and
+ @objects.first.uuid == current_api_client_authorization.andand.uuid and
+ (@filters.map(&:first) & %w(uuid api_token)).any?)
+ return true
end
+ forbidden
+ end
+
+ def forbidden
+ send_error('Forbidden: this API client cannot manipulate other clients\' access tokens.',
+ status: 403)
end
end
def permission_to_update
(permission_to_create and
- not self.user_id_changed? and
- not self.owner_uuid_changed?)
+ not uuid_changed? and
+ not user_id_changed? and
+ not owner_uuid_changed?)
end
def log_update
end
end
- [
- [:admin, :admin, 200],
- [:admin, :active, 403],
- [:admin, :admin_vm, 403], # this belongs to the user of current session, but we can't get it by uuid
- [:admin_trustedclient, :active, 200],
- ].each do |user, token, status|
- test "as user #{user} get #{token} token and expect #{status}" do
+ [# anyone can look up the token they're currently using
+ [:admin, :admin, 200, 200, 1],
+ [:active, :active, 200, 200, 1],
+ # cannot look up other tokens (even for same user) if not trustedclient
+ [:admin, :active, 403, 403],
+ [:admin, :admin_vm, 403, 403],
+ [:active, :admin, 403, 403],
+ # cannot look up other tokens for other users, regardless of trustedclient
+ [:admin_trustedclient, :active, 404, 200, 0],
+ [:active_trustedclient, :admin, 404, 200, 0],
+ ].each do |user, token, expect_get_response, expect_list_response, expect_list_items|
+ test "using '#{user}', get '#{token}' by uuid" do
authorize_with user
- get :show, {id: api_client_authorizations(token).uuid}
- assert_response status
+ get :show, {
+ id: api_client_authorizations(token).uuid,
+ }
+ assert_response expect_get_response
+ end
+
+ test "using '#{user}', update '#{token}' by uuid" do
+ authorize_with user
+ put :update, {
+ id: api_client_authorizations(token).uuid,
+ api_client_authorization: {},
+ }
+ assert_response expect_get_response
+ end
+
+ test "using '#{user}', delete '#{token}' by uuid" do
+ authorize_with user
+ post :destroy, {
+ id: api_client_authorizations(token).uuid,
+ }
+ assert_response expect_get_response
end
- end
- [
- [:admin, :admin, 200],
- [:admin, :active, 403],
- [:admin, :admin_vm, 403], # this belongs to the user of current session, but we can't list it by uuid
- [:admin_trustedclient, :active, 200],
- ].each do |user, token, status|
- test "as user #{user} list #{token} token using uuid and expect #{status}" do
+ test "using '#{user}', list '#{token}' by uuid" do
authorize_with user
get :index, {
- filters: [['uuid','=',api_client_authorizations(token).uuid]]
+ filters: [['uuid','=',api_client_authorizations(token).uuid]],
}
- assert_response status
+ assert_response expect_list_response
+ if expect_list_items
+ assert_equal assigns(:objects).length, expect_list_items
+ end
end
- end
- [
- [:admin, :admin, 200],
- [:admin, :active, 403],
- [:admin, :admin_vm, 200], # this belongs to the user of current session, and can be listed by token
- [:admin_trustedclient, :active, 200],
- ].each do |user, token, status|
- test "as user #{user} list #{token} token using token and expect #{status}" do
+ test "using '#{user}', list '#{token}' by token" do
authorize_with user
get :index, {
- filters: [['api_token','=',api_client_authorizations(token).api_token]]
+ filters: [['api_token','=',api_client_authorizations(token).api_token]],
}
- assert_response status
+ assert_response expect_list_response
+ if expect_list_items
+ assert_equal assigns(:objects).length, expect_list_items
+ end
end
end
+
+ test "scoped token cannot change its own scopes" do
+ authorize_with :admin_vm
+ put :update, {
+ id: api_client_authorizations(:admin_vm).uuid,
+ api_client_authorization: {scopes: ['all']},
+ }
+ assert_response 403
+ end
+
+ test "token cannot change its own uuid" do
+ authorize_with :admin
+ put :update, {
+ id: api_client_authorizations(:admin).uuid,
+ api_client_authorization: {uuid: 'zzzzz-gj3su-zzzzzzzzzzzzzzz'},
+ }
+ assert_response 403
+ end
end
end
[
- {cfg: :git_repo_ssh_base, cfgval: "git@example.com:", match: %r"^git@example.com:/"},
- {cfg: :git_repo_ssh_base, cfgval: true, match: %r"^git@git.zzzzz.arvadosapi.com:/"},
+ {cfg: :git_repo_ssh_base, cfgval: "git@example.com:", match: %r"^git@example.com:"},
+ {cfg: :git_repo_ssh_base, cfgval: true, match: %r"^git@git.zzzzz.arvadosapi.com:"},
{cfg: :git_repo_ssh_base, cfgval: false, refute: /^git@/ },
- {cfg: :git_repo_https_base, cfgval: "https://example.com/", match: %r"https://example.com/"},
+ {cfg: :git_repo_https_base, cfgval: "https://example.com/", match: %r"^https://example.com/"},
{cfg: :git_repo_https_base, cfgval: true, match: %r"^https://git.zzzzz.arvadosapi.com/"},
{cfg: :git_repo_https_base, cfgval: false, refute: /^http/ },
].each do |expect|
authorize_with :active
get :index
assert_response :success
+ assert_not_empty json_response['items']
json_response['items'].each do |r|
if expect[:refute]
r['clone_urls'].each do |u|
refute_match expect[:refute], u
end
else
- assert r['clone_urls'].any? do |u|
- expect[:prefix].match u
- end
+ assert((r['clone_urls'].any? do |u|
+ expect[:match].match u
+ end),
+ "no match for #{expect[:match]} in #{r['clone_urls'].inspect}")
end
end
end
arvados_client=self._new_arvados(),
arvados_node=arvados_node,
cloud_client=self._new_cloud(),
- cloud_size=cloud_size).tell_proxy()
+ cloud_size=cloud_size).proxy()
self.booting[new_setup.actor_ref.actor_urn] = new_setup
self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
shutdown = self._node_shutdown.start(
timer_actor=self._timer, cloud_client=self._new_cloud(),
arvados_client=self._new_arvados(),
- node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
- self.shutdowns[cloud_node_id] = shutdown
+ node_monitor=node_actor.actor_ref, cancellable=cancellable)
+ self.shutdowns[cloud_node_id] = shutdown.proxy()
self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)