try:
self.cond.acquire()
j = self.processes[uuid]
- logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+ txt = self.work_api[0].upper() + self.work_api[1:-1]
+ logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
with Perf(metrics, "done %s" % j.name):
j.done(event["properties"]["new_attributes"])
self.cond.notify()
self.final_output_collection = final
+ def set_crunch_output(self):
+ if self.work_api == "containers":
+ try:
+ current = self.api.containers().current().execute(num_retries=self.num_retries)
+ self.api.containers().update(uuid=current['uuid'],
+ body={
+ 'output': self.final_output_collection.portable_data_hash(),
+ }).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.info("Setting container output: %s", e)
+ elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
+ self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
+ body={
+ 'output': self.final_output_collection.portable_data_hash(),
+ 'success': self.final_status == "success",
+ 'progress':1.0
+ }).execute(num_retries=self.num_retries)
+
def arv_executor(self, tool, job_order, **kwargs):
self.debug = kwargs.get("debug")
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
- if self.final_status != "success":
- raise WorkflowException("Workflow failed.")
-
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
self.make_output_collection(self.output_name, self.final_output)
+ self.set_crunch_output()
+
+ if self.final_status != "success":
+ raise WorkflowException("Workflow failed.")
if kwargs.get("compute_checksum"):
adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
self.arvrunner.processes[response["container_uuid"]] = self
- logger.info("Container %s (%s) request state is %s", self.name, response["uuid"], response["state"])
+ container = self.arvrunner.api.containers().get(
+ uuid=response["container_uuid"]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ logger.info("Container request %s (%s) state is %s with container %s %s", self.name, response["uuid"], response["state"], container["uuid"], container["state"])
- if response["state"] == "Final":
- self.done(response)
+ if container["state"] in ("Complete", "Cancelled"):
+ self.done(container)
except Exception as e:
logger.error("Got error %s" % str(e))
self.output_callback({}, "permanentFail")
from arvados.api import OrderedJsonModel
from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
from cwltool.load_tool import load_tool
+from cwltool.errors import WorkflowException
logger = logging.getLogger('arvados.cwl-runner')
arvados_cwl.add_arv_hints()
+ runner = None
try:
job_order_object = arvados.current_job()['script_parameters']
args.basedir = os.getcwd()
args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
outputObj = runner.arv_executor(t, job_order_object, **vars(args))
-
- if runner.final_output_collection:
+ except Exception as e:
+ if isinstance(e, WorkflowException):
+ logging.info("Workflow error %s", e)
+ else:
+ logging.exception("Unhandled exception")
+ if runner and runner.final_output_collection:
outputCollection = runner.final_output_collection.portable_data_hash()
else:
outputCollection = None
-
api.job_tasks().update(uuid=arvados.current_task()['uuid'],
body={
'output': outputCollection,
- 'success': True,
- 'progress':1.0
- }).execute()
- except Exception as e:
- logging.exception("Unhandled exception")
- api.job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={
- 'output': None,
'success': False,
'progress':1.0
}).execute()
accept_attribute_as_json :runtime_constraints, Hash
accept_attribute_as_json :command, Array
+ skip_before_filter :find_object_by_uuid, only: [:current]
+ skip_before_filter :render_404_if_no_object, only: [:current]
+
def auth
if @object.locked_by_uuid != Thread.current[:api_client_authorization].uuid
raise ArvadosModel::PermissionDeniedError.new("Not locked by your token")
@object.unlock
show
end
+
+ def current
+ if Thread.current[:api_client_authorization].nil?
+ send_error("Not logged in", status: 401)
+ else
+ c = Container.where(auth_uuid: Thread.current[:api_client_authorization].uuid).first
+ if c.nil?
+ send_error("Token is not associated with a container.", status: 404)
+ else
+ @object = c
+ show
+ end
+ end
+ end
end
validate :validate_state_change
validate :validate_change
validate :validate_lock
+ validate :validate_output
after_validation :assign_auth
before_save :sort_serialized_attrs
after_save :handle_completed
end
def permission_to_update
- current_user.andand.is_admin
+ # Override base permission check to allow auth_uuid to set progress and
+ # output (only). Whether it is legal to set progress and output in the current
+ # state has already been checked in validate_change.
+ current_user.andand.is_admin ||
+ (!current_api_client_authorization.nil? and
+ [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid)
+ end
+
+ def ensure_owner_uuid_is_permitted
+ # Override base permission check to allow auth_uuid to set progress and
+ # output (only). Whether it is legal to set progress and output in the current
+ # state has already been checked in validate_change.
+ if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid
+ check_update_whitelist [:progress, :output]
+ else
+ super
+ end
end
def set_timestamps
permitted.push :priority
when Running
- permitted.push :priority, :progress
+ permitted.push :priority, :progress, :output
if self.state_changed?
permitted.push :started_at
end
end
def validate_lock
- # If the Container is already locked by someone other than the
- # current api_client_auth, disallow all changes -- except
- # priority, which needs to change to reflect max(priority) of
- # relevant ContainerRequests.
- if locked_by_uuid_was
- if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
- check_update_whitelist [:priority]
- end
- end
-
if [Locked, Running].include? self.state
# If the Container was already locked, locked_by_uuid must not
# changes. Otherwise, the current auth gets the lock.
- need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
+ need_lock = locked_by_uuid_was || current_api_client_authorization.andand.uuid
else
need_lock = nil
end
self.locked_by_uuid = need_lock
end
+ def validate_output
+ # Output must exist and be readable by the current user. This is so
+ # that a container cannot "claim" a collection that it doesn't otherwise
+ # have access to just by setting the output field to the collection PDH.
+ if output_changed?
+ c = Collection.
+ readable_by(current_user).
+ where(portable_data_hash: self.output).
+ first
+ if !c
+ errors.add :output, "collection must exist and be readable by current user."
+ end
+ end
+ end
+
def assign_auth
if self.auth_uuid_changed?
return errors.add :auth_uuid, 'is readonly'
get 'auth', on: :member
post 'lock', on: :member
post 'unlock', on: :member
+ get 'current', on: :collection
end
resources :container_requests
resources :jobs do
user: system_user
api_token: kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw
expires_at: 2038-01-01 00:00:00
+
+running_container_auth:
+ uuid: zzzzz-gj3su-077z32aux8dg2s2
+ api_client: untrusted
+ user: active
+ api_token: 3kg6k6lzmp9kj6bpkcoxie963cmvjahbt2fod9zru30k1jqdmi
+ expires_at: 2038-01-01 00:00:00
+
+not_running_container_auth:
+ uuid: zzzzz-gj3su-077z32aux8dg2s3
+ api_client: untrusted
+ user: active
+ api_token: 4kg6k6lzmp9kj6bpkcoxie963cmvjahbt2fod9zru30k1jqdmj
+ expires_at: 2038-01-01 00:00:00
manifest_text: ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:file 0:0:file.bam\n"
name: collection_with_several_unsupported_file_types
+collection_not_readable_by_active:
+ uuid: zzzzz-4zz18-cd42uwvy3neko21
+ portable_data_hash: bb89eb5140e2848d39b416daeef4ffc5+45
+ owner_uuid: zzzzz-tpzed-000000000000000
+ created_at: 2014-02-03T17:22:54Z
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
+ modified_at: 2014-02-03T17:22:54Z
+ updated_at: 2014-02-03T17:22:54Z
+ manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+ name: collection_not_readable_by_active
+
+
# Test Helper trims the rest of the file
# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
runtime_constraints:
ram: 12000000000
vcpus: 4
- auth_uuid: zzzzz-gj3su-077z32aux8dg2s1
+ auth_uuid: zzzzz-gj3su-077z32aux8dg2s2
running_older:
uuid: zzzzz-dz642-runningcontain2
runtime_constraints:
ram: 12000000000
vcpus: 4
- auth_uuid: zzzzz-gj3su-077z32aux8dg2s1
+ auth_uuid: zzzzz-gj3su-077z32aux8dg2s3
failed_container:
uuid: zzzzz-dz642-failedcontainr1
assert_equal state, Container.where(uuid: uuid).first.state
end
end
+
+ test 'get current container for token' do
+ authorize_with :running_container_auth
+ get :current
+ assert_response :success
+ assert_equal containers(:running).uuid, json_response['uuid']
+ end
+
+ test 'no container associated with token' do
+ authorize_with :dispatch1
+ get :current
+ assert_response 404
+ end
+
+ test 'try get current container, no token' do
+ get :current
+ assert_response 401
+ end
+
end
end
[
- ['active', 'zzzzz-dz642-runningcontainr'],
+ ['running_container_auth', 'zzzzz-dz642-runningcontainr'],
['active_no_prefs', nil],
].each do |token, expected|
test "create as #{token} and expect requesting_container_uuid to be #{expected}" do
DEFAULT_ATTRS = {
command: ['echo', 'foo'],
- container_image: 'img',
+ container_image: 'fa3c1a9cb6783f85f2ecda037e07b8c3+167',
output_path: '/tmp',
priority: 1,
runtime_constraints: {"vcpus" => 1, "ram" => 1},
}
- REUSABLE_COMMON_ATTRS = {container_image: "test",
+ REUSABLE_COMMON_ATTRS = {container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
cwd: "test",
command: ["echo", "hello"],
output_path: "test",
def minimal_new attrs={}
cr = ContainerRequest.new DEFAULT_ATTRS.merge(attrs)
+ cr.state = ContainerRequest::Committed
act_as_user users(:active) do
cr.save!
end
- c = Container.new DEFAULT_ATTRS.merge(attrs)
- act_as_system_user do
- c.save!
- assert cr.update_attributes(container_uuid: c.uuid,
- state: ContainerRequest::Committed,
- ), show_errors(cr)
- end
+ c = Container.find_by_uuid cr.container_uuid
+ assert_not_nil c
return c, cr
end
def check_illegal_modify c
check_illegal_updates c, [{command: ["echo", "bar"]},
- {container_image: "img2"},
+ {container_image: "arvados/apitestfixture:june10"},
{cwd: "/tmp2"},
{environment: {"FOO" => "BAR"}},
{mounts: {"FOO" => "BAR"}},
test "Container serialized hash attributes sorted before save" do
env = {"C" => 3, "B" => 2, "A" => 1}
- m = {"F" => 3, "E" => 2, "D" => 1}
+ m = {"F" => {"kind" => 3}, "E" => {"kind" => 2}, "D" => {"kind" => 1}}
rc = {"vcpus" => 1, "ram" => 1}
c, _ = minimal_new(environment: env, mounts: m, runtime_constraints: rc)
assert_equal c.environment.to_json, Container.deep_sort_hash(env).to_json
test "find_reusable method should not select completed container when inconsistent outputs exist" do
set_user_from_auth :active
- common_attrs = REUSABLE_COMMON_ATTRS.merge({environment: {"var" => "complete"}})
+ common_attrs = REUSABLE_COMMON_ATTRS.merge({environment: {"var" => "complete"}, priority: 1})
completed_attrs = {
state: Container::Complete,
exit_code: 0,
log: 'ea10d51bcf88862dbcc36eb292017dfd+45',
}
- c_output1, _ = minimal_new(common_attrs)
- c_output2, _ = minimal_new(common_attrs)
-
set_user_from_auth :dispatch1
+
+ c_output1 = Container.create common_attrs
+ c_output2 = Container.create common_attrs
+
+ cr = ContainerRequest.new common_attrs
+ cr.state = ContainerRequest::Committed
+ cr.container_uuid = c_output1.uuid
+ cr.save!
+
+ cr = ContainerRequest.new common_attrs
+ cr.state = ContainerRequest::Committed
+ cr.container_uuid = c_output2.uuid
+ cr.save!
+
c_output1.update_attributes!({state: Container::Locked})
c_output1.update_attributes!({state: Container::Running})
c_output1.update_attributes!(completed_attrs.merge({output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45'}))
assert c.update_attributes(exit_code: 1, state: Container::Complete)
end
+
+ test "locked_by_uuid can set output on running container" do
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.lock
+ c.update_attributes! state: Container::Running
+
+ assert_equal c.locked_by_uuid, Thread.current[:api_client_authorization].uuid
+
+ assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
+ assert c.update_attributes! state: Container::Complete
+ end
+
+ test "auth_uuid can set output on running container, but not change container state" do
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.lock
+ c.update_attributes! state: Container::Running
+
+ Thread.current[:api_client_authorization] = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
+ Thread.current[:user] = User.find_by_id(Thread.current[:api_client_authorization].user_id)
+ assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
+
+ assert_raises ArvadosModel::PermissionDeniedError do
+ # auth_uuid cannot set container state
+ c.update_attributes state: Container::Complete
+ end
+ end
+
+ test "not allowed to set output that is not readable by current user" do
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.lock
+ c.update_attributes! state: Container::Running
+
+ Thread.current[:api_client_authorization] = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
+ Thread.current[:user] = User.find_by_id(Thread.current[:api_client_authorization].user_id)
+
+ assert_raises ActiveRecord::RecordInvalid do
+ c.update_attributes! output: collections(:collection_not_readable_by_active).portable_data_hash
+ end
+ end
+
+ test "other token cannot set output on running container" do
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.lock
+ c.update_attributes! state: Container::Running
+
+ set_user_from_auth :not_running_container_auth
+ assert_raises ArvadosModel::PermissionDeniedError do
+ c.update_attributes! output: collections(:foo_file).portable_data_hash
+ end
+ end
+
end
return nil
}
+ if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+ // Output may have been set directly by the container, so
+ // refresh the container record to check.
+ err := runner.ArvClient.Get("containers", runner.Container.UUID,
+ nil, &runner.Container)
+ if err != nil {
+ return err
+ }
+ if runner.Container.Output != "" {
+ // Container output is already set.
+ runner.OutputPDH = &runner.Container.Output
+ return nil
+ }
+ }
+
if runner.HostOutputDir == "" {
return nil
}
stop chan bool
cwd string
env []string
+ api *ArvTestClient
}
func NewTestDockerClient() *TestDockerClient {
docker.RemoveImage(hwImageId, true)
api = &ArvTestClient{Container: rec}
+ docker.api = api
cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.statInterval = 100 * time.Millisecond
am := &ArvMountCmdLine{}
c.Check(err, NotNil)
c.Check(strings.Contains(err.Error(), "Unsupported mount kind 'collection' for stdout"), Equals, true)
}
+
+func (s *TestSuite) TestFullRunWithAPI(c *C) {
+ os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
+ defer os.Unsetenv("ARVADOS_API_HOST")
+ api, _ := FullRunHelper(c, `{
+ "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {"API": true}
+}`, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[1][17:]+"\n"))
+ t.logWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "test.arvados.org\n"), Equals, true)
+ c.Check(api.CalledWith("container.output", "d41d8cd98f00b204e9800998ecf8427e+0"), NotNil)
+}
+
+func (s *TestSuite) TestFullRunSetOutput(c *C) {
+ os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
+ defer os.Unsetenv("ARVADOS_API_HOST")
+ api, _ := FullRunHelper(c, `{
+ "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {"API": true}
+}`, func(t *TestDockerClient) {
+ t.api.Container.Output = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
+ t.logWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ c.Check(api.CalledWith("container.output", "d4ab34d3d4f8a72f5c4973051ae69fab+122"), NotNil)
+}