if [[ "$UPLOAD" != 0 ]]; then
+ if [[ $DEBUG > 0 ]]; then
+ EXTRA_UPLOAD_FLAGS=" --verbose"
+ else
+ EXTRA_UPLOAD_FLAGS=""
+ fi
+
if [[ ! -e "$WORKSPACE/packages" ]]; then
mkdir -p "$WORKSPACE/packages"
fi
timer_reset
if [ "$PYTHON_BUILD_FAILURES" -eq 0 ]; then
- /usr/local/arvados-dev/jenkins/run_upload_packages.py -v --workspace $WORKSPACE python
+ /usr/local/arvados-dev/jenkins/run_upload_packages.py $EXTRA_UPLOAD_FLAGS --workspace $WORKSPACE python
else
echo "Skipping python packages upload, there were errors building the packages"
fi
timer_reset
if [ "$GEM_BUILD_FAILURES" -eq 0 ]; then
- /usr/local/arvados-dev/jenkins/run_upload_packages.py -v --workspace $WORKSPACE gems
+ /usr/local/arvados-dev/jenkins/run_upload_packages.py $EXTRA_UPLOAD_FLAGS --workspace $WORKSPACE gems
else
echo "Skipping ruby gem upload, there were errors building the packages"
fi
s.executables << "arv-tag"
s.required_ruby_version = '>= 2.1.0'
s.add_runtime_dependency 'arvados', '~> 0.1', '>= 0.1.20150128223554'
- s.add_runtime_dependency 'google-api-client', '~> 0.6', '>= 0.6.3', '<0.9'
+ # Our google-api-client dependency used to be < 0.9, but that could be
+ # satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
+ s.add_runtime_dependency 'google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
s.add_runtime_dependency 'activesupport', '~> 3.2', '>= 3.2.13'
s.add_runtime_dependency 'json', '~> 1.7', '>= 1.7.7'
s.add_runtime_dependency 'trollop', '~> 2.0'
end
begin
- require 'curb'
- require 'rubygems'
- require 'arvados/google_api_client'
require 'json'
+ require 'net/http'
require 'pp'
- require 'trollop'
+ require 'tempfile'
+ require 'yaml'
+rescue LoadError => error
+ abort "Error loading libraries: #{error}\n"
+end
+
+begin
+ require 'rubygems'
+ # Load the gems with more requirements first, so we respect any version
+ # constraints they put on gems loaded later.
+ require 'arvados/google_api_client'
+ require 'active_support/inflector'
require 'andand'
+ require 'curb'
require 'oj'
- require 'active_support/inflector'
- require 'yaml'
- require 'tempfile'
- require 'net/http'
-rescue LoadError
+ require 'trollop'
+rescue LoadError => error
abort <<-EOS
+Error loading gems: #{error}
+
Please install all required gems:
- gem install activesupport andand curb google-api-client json oj trollop yaml
+ gem install arvados activesupport andand curb json oj trollop
EOS
end
// Returns a non-nil error if an error occurs making the API call, the
// API responds with a non-successful HTTP status, or an error occurs
// parsing the response body.
-func (c ArvadosClient) Call(method string, resourceType string, uuid string, action string, parameters Dict, output interface{}) error {
+func (c ArvadosClient) Call(method, resourceType, uuid, action string, parameters Dict, output interface{}) error {
reader, err := c.CallRaw(method, resourceType, uuid, action, parameters)
if reader != nil {
defer reader.Close()
FooBarDirCollection = "zzzzz-4zz18-foonbarfilesdir"
FooPdh = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
HelloWorldPdh = "55713e6a34081eb03609e7ad5fcad129+62"
+
+ Dispatch1Token = "kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw"
+ Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr"
)
// A valid manifest designed to test various edge cases and parsing
self.filters = [[]]
self.on_event_cb = on_event_cb
self.last_log_id = last_log_id
- self.is_closed = False
- self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+ self.is_closed = threading.Event()
+ self._setup_event_client()
- def connect(self):
- self.ec.connect()
-
- def close_connection(self):
- self.ec.close_connection()
+ def _setup_event_client(self):
+ self.ec = _EventClient(self.url, self.filters, self.on_event,
+ self.last_log_id, self.on_closed)
+ self.ec.daemon = True
+ try:
+ self.ec.connect()
+ except Exception:
+ self.ec.close_connection()
+ raise
def subscribe(self, f, last_log_id=None):
self.filters.append(f)
self.ec.unsubscribe(f)
def close(self, code=1000, reason='', timeout=0):
- self.is_closed = True
+ self.is_closed.set()
self.ec.close(code, reason, timeout)
def on_event(self, m):
thread.interrupt_main()
def on_closed(self):
- if self.is_closed == False:
+ if not self.is_closed.is_set():
_logger.warn("Unexpected close. Reconnecting.")
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
try:
- self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
- self.ec.connect()
+ self._setup_event_client()
break
except Exception as e:
_logger.warn("Error '%s' during websocket reconnect.", e)
if tries_left == 0:
_logger.exception("EventClient thread could not contact websocket server.")
- self.is_closed = True
+ self.is_closed.set()
thread.interrupt_main()
return
+ def run_forever(self):
+ # Have to poll here to let KeyboardInterrupt get raised.
+ while not self.is_closed.wait(1):
+ pass
+
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event, poll_time, last_log_id):
if not endpoint:
raise errors.FeatureNotEnabledError(
"Server does not advertise a websocket endpoint")
+ uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
try:
- uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
client = EventClient(uri_with_token, filters, on_event, last_log_id)
- ok = False
- try:
- client.connect()
- ok = True
- return client
- finally:
- if not ok:
- client.close_connection()
- except:
+ except Exception:
_logger.warn("Failed to connect to websockets on %s" % endpoint)
raise
+ else:
+ return client
def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
import arvados
-import arvados.events
-import arvados.errors
-from datetime import datetime, timedelta, tzinfo
+import io
import logging
-import logging.handlers
import mock
import Queue
import run_test_server
-import StringIO
-import tempfile
import threading
import time
import unittest
+import arvados_testutil
+
class WebsocketTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
TIME_PAST = time.time()-3600
TIME_FUTURE = time.time()+3600
+ MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
def setUp(self):
self.ws = None
run_test_server.authorize_with('active')
events = Queue.Queue(100)
- logstream = StringIO.StringIO()
+ logstream = io.BytesIO()
rootLogger = logging.getLogger()
streamHandler = logging.StreamHandler(logstream)
rootLogger.addHandler(streamHandler)
# close (im)properly
if close_unexpected:
- self.ws.close_connection()
+ self.ws.ec.close_connection()
else:
self.ws.close()
def test_websocket_reconnect_retry(self, event_client_connect):
event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
- logstream = StringIO.StringIO()
+ logstream = io.BytesIO()
rootLogger = logging.getLogger()
streamHandler = logging.StreamHandler(logstream)
rootLogger.addHandler(streamHandler)
found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
self.assertNotEqual(found, -1)
rootLogger.removeHandler(streamHandler)
+
+ @mock.patch('arvados.events._EventClient')
+ def test_subscribe_method(self, websocket_client):
+ filters = [['object_uuid', 'is_a', 'arvados#human']]
+ client = arvados.events.EventClient(
+ self.MOCK_WS_URL, [], lambda event: None, None)
+ client.subscribe(filters[:], 99)
+ websocket_client().subscribe.assert_called_with(filters, 99)
+
+ @mock.patch('arvados.events._EventClient')
+ def test_unsubscribe(self, websocket_client):
+ filters = [['object_uuid', 'is_a', 'arvados#human']]
+ client = arvados.events.EventClient(
+ self.MOCK_WS_URL, filters[:], lambda event: None, None)
+ client.unsubscribe(filters[:])
+ websocket_client().unsubscribe.assert_called_with(filters)
+
+ @mock.patch('arvados.events._EventClient')
+ def test_run_forever_survives_reconnects(self, websocket_client):
+ connection_cond = threading.Condition()
+ def ws_connect():
+ with connection_cond:
+ connection_cond.notify_all()
+ websocket_client().connect.side_effect = ws_connect
+ client = arvados.events.EventClient(
+ self.MOCK_WS_URL, [], lambda event: None, None)
+ with connection_cond:
+ forever_thread = threading.Thread(target=client.run_forever)
+ forever_thread.start()
+ # Simulate an unexpected disconnect, and wait for reconnect.
+ close_thread = threading.Thread(target=client.on_closed)
+ close_thread.start()
+ connection_cond.wait()
+ close_thread.join()
+ run_forever_alive = forever_thread.is_alive()
+ client.close()
+ forever_thread.join()
+ self.assertTrue(run_forever_alive)
+ self.assertEqual(2, websocket_client().connect.call_count)
+
+
+class PollClientTestCase(unittest.TestCase):
+ class MockLogs(object):
+ def __init__(self):
+ self.logs = []
+ self.lock = threading.Lock()
+
+ def add(self, log):
+ with self.lock:
+ self.logs.append(log)
+
+ def return_list(self, num_retries=None):
+ with self.lock:
+ retval = self.logs
+ self.logs = []
+ return {'items': retval, 'items_available': len(retval)}
+
+
+ def setUp(self):
+ self.logs = self.MockLogs()
+ self.arv = mock.MagicMock(name='arvados.api()')
+ self.arv.logs().list().execute.side_effect = self.logs.return_list
+ self.callback_cond = threading.Condition()
+ self.recv_events = []
+
+ def tearDown(self):
+ if hasattr(self, 'client'):
+ self.client.close(timeout=None)
+
+ def callback(self, event):
+ with self.callback_cond:
+ self.recv_events.append(event)
+ self.callback_cond.notify_all()
+
+ def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
+ if filters is None:
+ filters = []
+ if callback is None:
+ callback = self.callback
+ self.client = arvados.events.PollClient(
+ self.arv, filters, callback, poll_time, last_log_id)
+
+ def was_filter_used(self, target):
+ return any(target in call[-1].get('filters', [])
+ for call in self.arv.logs().list.call_args_list)
+
+ def test_callback(self):
+ test_log = {'id': 12345, 'testkey': 'testtext'}
+ self.logs.add({'id': 123})
+ self.build_client(poll_time=.01)
+ with self.callback_cond:
+ self.client.start()
+ self.callback_cond.wait()
+ self.logs.add(test_log.copy())
+ self.callback_cond.wait()
+ self.client.close(timeout=None)
+ self.assertIn(test_log, self.recv_events)
+
+ def test_subscribe(self):
+ client_filter = ['kind', '=', 'arvados#test']
+ self.build_client()
+ self.client.subscribe([client_filter[:]])
+ with self.callback_cond:
+ self.client.start()
+ self.callback_cond.wait()
+ self.client.close(timeout=None)
+ self.assertTrue(self.was_filter_used(client_filter))
+
+ def test_unsubscribe(self):
+ client_filter = ['kind', '=', 'arvados#test']
+ self.build_client()
+ self.client.subscribe([client_filter[:]])
+ self.client.unsubscribe([client_filter[:]])
+ self.client.start()
+ self.client.close(timeout=None)
+ self.assertFalse(self.was_filter_used(client_filter))
+
+ def test_run_forever(self):
+ self.build_client()
+ with self.callback_cond:
+ self.client.start()
+ forever_thread = threading.Thread(target=self.client.run_forever)
+ forever_thread.start()
+ self.callback_cond.wait()
+ self.assertTrue(forever_thread.is_alive())
+ self.client.close()
+ forever_thread.join()
# activesupport <4.2.6 only because https://dev.arvados.org/issues/8222
s.add_dependency('activesupport', '>= 3', '< 4.2.6')
s.add_dependency('andand', '~> 1.3', '>= 1.3.3')
- s.add_dependency('google-api-client', '>= 0.7', '< 0.9')
+ # Our google-api-client dependency used to be < 0.9, but that could be
+ # satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
+ s.add_dependency('google-api-client', '>= 0.7', '< 0.8.9')
# work around undeclared dependency on i18n in some activesupport 3.x.x:
s.add_dependency('i18n', '~> 0')
s.add_dependency('json', '~> 1.7', '>= 1.7.7')
class Arvados::V1::ApiClientAuthorizationsController < ApplicationController
accept_attribute_as_json :scopes, Array
- before_filter :current_api_client_is_trusted
+ before_filter :current_api_client_is_trusted, :except => [:current]
before_filter :admin_required, :only => :create_system_auth
- skip_before_filter :render_404_if_no_object, :only => :create_system_auth
+ skip_before_filter :render_404_if_no_object, :only => [:create_system_auth, :current]
+ skip_before_filter :find_object_by_uuid, :only => [:create_system_auth, :current]
def self._create_system_auth_requires_parameters
{
super
end
+ def current
+ @object = Thread.current[:api_client_authorization]
+ show
+ end
+
protected
def default_orders
accept_attribute_as_json :runtime_constraints, Hash
accept_attribute_as_json :command, Array
+ def auth
+ if @object.locked_by_uuid != Thread.current[:api_client_authorization].uuid
+ raise ArvadosModel::PermissionDeniedError.new("Not locked by your token")
+ end
+ @object = @object.auth
+ show
+ end
+
+ # Updates use row locking to resolve races between multiple
+ # dispatchers trying to lock the same container.
+ def update
+ @object.with_lock do
+ super
+ end
+ end
end
supplied_token =
params["api_token"] ||
params["oauth_token"] ||
- env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-z0-9]+)/).andand[1]
+ env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1]
if supplied_token
api_client_auth = ApiClientAuthorization.
includes(:api_client, :user).
validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
validate :validate_state_change
validate :validate_change
+ validate :validate_lock
+ after_validation :assign_auth
after_save :handle_completed
has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
+ belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
api_accessible :user, extend: :common do |t|
t.add :command
t.add :environment
t.add :exit_code
t.add :finished_at
+ t.add :locked_by_uuid
t.add :log
t.add :mounts
t.add :output
t.add :runtime_constraints
t.add :started_at
t.add :state
+ t.add :auth_uuid
end
# Supported states for a container
States =
[
(Queued = 'Queued'),
+ (Locked = 'Locked'),
(Running = 'Running'),
(Complete = 'Complete'),
(Cancelled = 'Cancelled')
State_transitions = {
nil => [Queued],
- Queued => [Running, Cancelled],
+ Queued => [Locked, Cancelled],
+ Locked => [Queued, Running, Cancelled],
Running => [Complete, Cancelled]
}
end
def update_priority!
- if [Queued, Running].include? self.state
+ if [Queued, Locked, Running].include? self.state
# Update the priority of this container to the maximum priority of any of
# its committed container requests and save the record.
- max = 0
- ContainerRequest.where(container_uuid: uuid).each do |cr|
- if cr.state == ContainerRequest::Committed and cr.priority > max
- max = cr.priority
- end
- end
- self.priority = max
+ self.priority = ContainerRequest.
+ where(container_uuid: uuid,
+ state: ContainerRequest::Committed).
+ maximum('priority')
self.save!
end
end
end
def validate_change
- permitted = []
+ permitted = [:state]
if self.new_record?
- permitted.push :owner_uuid, :command, :container_image, :cwd, :environment,
- :mounts, :output_path, :priority, :runtime_constraints, :state
+ permitted.push(:owner_uuid, :command, :container_image, :cwd,
+ :environment, :mounts, :output_path, :priority,
+ :runtime_constraints)
end
case self.state
- when Queued
- # permit priority change only.
+ when Queued, Locked
permitted.push :priority
when Running
+ permitted.push :priority, :progress
if self.state_changed?
- # At point of state change, can set state and started_at
- permitted.push :state, :started_at
- else
- # While running, can update priority and progress.
- permitted.push :priority, :progress
+ permitted.push :started_at
end
when Complete
- if self.state_changed?
- permitted.push :state, :finished_at, :output, :log, :exit_code
- else
- errors.add :state, "cannot update record"
+ if self.state_was == Running
+ permitted.push :finished_at, :output, :log, :exit_code
end
when Cancelled
- if self.state_changed?
- if self.state_was == Running
- permitted.push :state, :finished_at, :output, :log
- elsif self.state_was == Queued
- permitted.push :state, :finished_at
- end
- else
- errors.add :state, "cannot update record"
+ case self.state_was
+ when Running
+ permitted.push :finished_at, :output, :log
+ when Queued, Locked
+ permitted.push :finished_at
end
else
- errors.add :state, "invalid state"
+ # The state_transitions check will add an error message for this
+ return false
end
check_update_whitelist permitted
end
+ def validate_lock
+ # If the Container is already locked by someone other than the
+ # current api_client_auth, disallow all changes -- except
+ # priority, which needs to change to reflect max(priority) of
+ # relevant ContainerRequests.
+ if locked_by_uuid_was
+ if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
+ check_update_whitelist [:priority]
+ end
+ end
+
+ if [Locked, Running].include? self.state
+ # If the Container was already locked, locked_by_uuid must not
+ # changes. Otherwise, the current auth gets the lock.
+ need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
+ else
+ need_lock = nil
+ end
+
+ # The caller can provide a new value for locked_by_uuid, but only
+ # if it's exactly what we expect. This allows a caller to perform
+ # an update like {"state":"Unlocked","locked_by_uuid":null}.
+ if self.locked_by_uuid_changed?
+ if self.locked_by_uuid != need_lock
+ return errors.add :locked_by_uuid, "can only change to #{need_lock}"
+ end
+ end
+ self.locked_by_uuid = need_lock
+ end
+
+ def assign_auth
+ if self.auth_uuid_changed?
+ return errors.add :auth_uuid, 'is readonly'
+ end
+ if not [Locked, Running].include? self.state
+ # don't need one
+ self.auth.andand.update_attributes(expires_at: db_current_time)
+ self.auth = nil
+ return
+ elsif self.auth
+ # already have one
+ return
+ end
+ cr = ContainerRequest.
+ where('container_uuid=? and priority>0', self.uuid).
+ order('priority desc').
+ first
+ if !cr
+ return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
+ end
+ self.auth = ApiClientAuthorization.
+ create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
+ api_client_id: 0)
+ end
+
def handle_completed
# This container is finished so finalize any associated container requests
# that are associated with this container.
self.cwd ||= "."
end
- # Turn a container request into a container.
+ # Create a new container (or find an existing one) to satisfy this
+ # request.
def resolve
- # In the future this will do things like resolve symbolic git and keep
- # references to content addresses.
- Container.create!({ :command => self.command,
- :container_image => self.container_image,
- :cwd => self.cwd,
- :environment => self.environment,
- :mounts => self.mounts,
- :output_path => self.output_path,
- :runtime_constraints => self.runtime_constraints })
+ # TODO: resolve symbolic git and keep references to content
+ # addresses.
+ c = act_as_system_user do
+ Container.create!(command: self.command,
+ container_image: self.container_image,
+ cwd: self.cwd,
+ environment: self.environment,
+ mounts: self.mounts,
+ output_path: self.output_path,
+ runtime_constraints: self.runtime_constraints)
+ end
+ self.container_uuid = c.uuid
end
def set_container
- if self.container_uuid_changed?
- if not current_user.andand.is_admin and not self.container_uuid.nil?
- errors.add :container_uuid, "can only be updated to nil."
- end
- else
- if self.state_changed?
- if self.state == Committed and (self.state_was == Uncommitted or self.state_was.nil?)
- act_as_system_user do
- self.container_uuid = self.resolve.andand.uuid
- end
- end
- end
+ if (container_uuid_changed? and
+ not current_user.andand.is_admin and
+ not container_uuid.nil?)
+ errors.add :container_uuid, "can only be updated to nil."
+ return false
+ end
+ if state_changed? and state == Committed and container_uuid.nil?
+ resolve
end
end
end
def update_priority
- if [Committed, Final].include? self.state and (self.state_changed? or
- self.priority_changed? or
- self.container_uuid_changed?)
- [self.container_uuid_was, self.container_uuid].each do |cuuid|
- unless cuuid.nil?
- c = Container.find_by_uuid cuuid
- act_as_system_user do
- c.update_priority!
- end
- end
+ if self.state_changed? or
+ self.priority_changed? or
+ self.container_uuid_changed?
+ act_as_system_user do
+ Container.
+ where('uuid in (?)',
+ [self.container_uuid_was, self.container_uuid].compact).
+ map(&:update_priority!)
end
end
end
namespace :v1 do
resources :api_client_authorizations do
post 'create_system_auth', on: :collection
+ get 'current', on: :collection
end
resources :api_clients
resources :authorized_keys
end
resources :humans
resources :job_tasks
- resources :containers
+ resources :containers do
+ get 'auth', on: :member
+ end
resources :container_requests
resources :jobs do
get 'queue', on: :collection
--- /dev/null
+class AddAuthsToContainer < ActiveRecord::Migration
+ def change
+ add_column :containers, :auth_uuid, :string
+ add_column :containers, :locked_by_uuid, :string
+ end
+end
--- /dev/null
+class AddAuthAndLockToContainerIndex < ActiveRecord::Migration
+ Columns_were = ["uuid", "owner_uuid", "modified_by_client_uuid", "modified_by_user_uuid", "state", "log", "cwd", "output_path", "output", "container_image"]
+ Columns = Columns_were + ["auth_uuid", "locked_by_uuid"]
+ def up
+ begin
+ remove_index :containers, :name => 'containers_search_index'
+ rescue
+ end
+ add_index(:containers, Columns, name: "containers_search_index")
+ end
+
+ def down
+ begin
+ remove_index :containers, :name => 'containers_search_index'
+ rescue
+ end
+ add_index(:containers, Columns_were, name: "containers_search_index")
+ end
+end
progress double precision,
priority integer,
updated_at timestamp without time zone NOT NULL,
- exit_code integer
+ exit_code integer,
+ auth_uuid character varying(255),
+ locked_by_uuid character varying(255)
);
-- Name: containers_search_index; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, state, log, cwd, output_path, output, container_image);
+CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, state, log, cwd, output_path, output, container_image, auth_uuid, locked_by_uuid);
--
INSERT INTO schema_migrations (version) VALUES ('20160209155729');
-INSERT INTO schema_migrations (version) VALUES ('20160324144017');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20160324144017');
+
+INSERT INTO schema_migrations (version) VALUES ('20160506175108');
+
+INSERT INTO schema_migrations (version) VALUES ('20160509143250');
\ No newline at end of file
end
def act_as_user user
+ #auth_was = Thread.current[:api_client_authorization]
user_was = Thread.current[:user]
Thread.current[:user] = user
+ #Thread.current[:api_client_authorization] = ApiClientAuthorization.
+ # where('user_id=? and scopes is null', user.id).
+ # order('expires_at desc').
+ # first
begin
yield
ensure
Thread.current[:user] = user_was
+ #Thread.current[:api_client_authorization] = auth_was
end
end
def check_update_whitelist permitted_fields
attribute_names.each do |field|
if not permitted_fields.include? field.to_sym and self.send((field.to_s + "_changed?").to_sym)
- errors.add field, "illegal update of field"
+ errors.add field, "cannot be modified in this state"
end
end
end
def validate_state_change
if self.state_changed?
unless state_transitions[self.state_was].andand.include? self.state
- errors.add :state, "invalid state change from #{self.state_was} to #{self.state}"
+ errors.add :state, "cannot change from #{self.state_was} to #{self.state}"
return false
end
end
api_token: 4nagbkv8eap0uok7pxm72nossq5asihls3yn5p4xmvqx5t5e7p
expires_at: 2038-01-01 00:00:00
+dispatch1:
+ uuid: zzzzz-gj3su-k9dvestay1plssr
+ api_client: untrusted
+ user: system_user
+ api_token: kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw
+ expires_at: 2038-01-01 00:00:00
\ No newline at end of file
--- /dev/null
+queued:
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ state: Committed
+ priority: 1
+ created_at: 2016-01-11 11:11:11.111111111 Z
+ updated_at: 2016-01-11 11:11:11.111111111 Z
+ modified_at: 2016-01-11 11:11:11.111111111 Z
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ container_image: test
+ cwd: test
+ output_path: test
+ command: ["echo", "hello"]
+ container_uuid: zzzzz-dz642-queuedcontainer
queued:
uuid: zzzzz-dz642-queuedcontainer
- owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+ owner_uuid: zzzzz-tpzed-000000000000000
state: Queued
priority: 1
created_at: 2016-01-11 11:11:11.111111111 Z
completed:
uuid: zzzzz-dz642-compltcontainer
- owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+ owner_uuid: zzzzz-tpzed-000000000000000
state: Complete
priority: 1
created_at: 2016-01-11 11:11:11.111111111 Z
}
assert_response 403
end
+
+ test "get current token" do
+ authorize_with :active
+ get :current
+ assert_response :success
+ assert_equal(json_response['api_token'],
+ api_client_authorizations(:active).api_token)
+ end
+
+ test "get current token, no auth" do
+ get :current
+ assert_response 401
+ end
end
--- /dev/null
+require 'test_helper'
+
+class Arvados::V1::ContainersControllerTest < ActionController::TestCase
+ test 'create' do
+ authorize_with :system_user
+ post :create, {
+ container: {
+ command: ['echo', 'hello'],
+ container_image: 'test',
+ output_path: 'test',
+ },
+ }
+ assert_response :success
+ end
+
+ [Container::Queued, Container::Complete].each do |state|
+ test "cannot get auth in #{state} state" do
+ authorize_with :dispatch1
+ get :auth, id: containers(:queued).uuid
+ assert_response 403
+ end
+ end
+
+ test 'cannot get auth with wrong token' do
+ authorize_with :dispatch1
+ c = containers(:queued)
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+
+ authorize_with :system_user
+ get :auth, id: c.uuid
+ assert_response 403
+ end
+
+ test 'get auth' do
+ authorize_with :dispatch1
+ c = containers(:queued)
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ get :auth, id: c.uuid
+ assert_response :success
+ assert_operator 32, :<, json_response['api_token'].length
+ assert_equal 'arvados#apiClientAuthorization', json_response['kind']
+ end
+
+ test 'no auth in container response' do
+ authorize_with :dispatch1
+ c = containers(:queued)
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ get :show, id: c.uuid
+ assert_response :success
+ assert_nil json_response['auth']
+ end
+end
def auth(api_client_auth_name)
{'HTTP_AUTHORIZATION' => "OAuth2 #{api_token(api_client_auth_name)}"}
end
+
+ def show_errors model
+ return lambda { model.errors.full_messages.inspect }
+ end
end
class ActiveSupport::TestCase
assert_equal "Committed", cr.state
c = Container.find_by_uuid cr.container_uuid
- assert_equal "Queued", c.state
+ assert_equal Container::Queued, c.state
act_as_system_user do
- c.state = "Running"
- c.save!
+ c.update_attributes! state: Container::Locked
+ c.update_attributes! state: Container::Running
end
cr.reload
assert_equal "Committed", cr.state
act_as_system_user do
- c.state = "Complete"
+ c.update_attributes! state: Container::Complete
c.save!
end
require 'test_helper'
class ContainerTest < ActiveSupport::TestCase
- def check_illegal_modify c
- c.reload
- c.command = ["echo", "bar"]
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.container_image = "img2"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ include DbCurrentTime
- c.reload
- c.cwd = "/tmp2"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ DEFAULT_ATTRS = {
+ command: ['echo', 'foo'],
+ container_image: 'img',
+ output_path: '/tmp',
+ priority: 1,
+ }
- c.reload
- c.environment = {"FOO" => "BAR"}
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
+ def minimal_new attrs={}
+ cr = ContainerRequest.new DEFAULT_ATTRS.merge(attrs)
+ act_as_user users(:active) do
+ cr.save!
end
-
- c.reload
- c.mounts = {"FOO" => "BAR"}
- assert_raises(ActiveRecord::RecordInvalid) do
+ c = Container.new DEFAULT_ATTRS.merge(attrs)
+ act_as_system_user do
c.save!
+ assert cr.update_attributes(container_uuid: c.uuid,
+ state: ContainerRequest::Committed,
+ ), show_errors(cr)
end
+ return c, cr
+ end
- c.reload
- c.output_path = "/tmp3"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
+ def check_illegal_updates c, bad_updates
+ bad_updates.each do |u|
+ refute c.update_attributes(u), u.inspect
+ refute c.valid?, u.inspect
+ c.reload
end
+ end
- c.reload
- c.runtime_constraints = {"FOO" => "BAR"}
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ def check_illegal_modify c
+ check_illegal_updates c, [{command: ["echo", "bar"]},
+ {container_image: "img2"},
+ {cwd: "/tmp2"},
+ {environment: {"FOO" => "BAR"}},
+ {mounts: {"FOO" => "BAR"}},
+ {output_path: "/tmp3"},
+ {locked_by_uuid: "zzzzz-gj3su-027z32aux8dg2s1"},
+ {auth_uuid: "zzzzz-gj3su-017z32aux8dg2s1"},
+ {runtime_constraints: {"FOO" => "BAR"}}]
end
def check_bogus_states c
- c.reload
- c.state = nil
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.state = "Flubber"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ check_illegal_updates c, [{state: nil},
+ {state: "Flubber"}]
end
- def check_no_change_from_complete c
+ def check_no_change_from_cancelled c
check_illegal_modify c
check_bogus_states c
-
- c.reload
- c.priority = 3
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.state = "Queued"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.state = "Running"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.state = "Complete"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ check_illegal_updates c, [{ priority: 3 },
+ { state: Container::Queued },
+ { state: Container::Locked },
+ { state: Container::Running },
+ { state: Container::Complete }]
end
test "Container create" do
act_as_system_user do
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.cwd = "/tmp"
- c.environment = {}
- c.mounts = {"BAR" => "FOO"}
- c.output_path = "/tmp"
- c.priority = 1
- c.runtime_constraints = {}
- c.save!
+ c, _ = minimal_new(environment: {},
+ mounts: {"BAR" => "FOO"},
+ output_path: "/tmp",
+ priority: 1,
+ runtime_constraints: {})
check_illegal_modify c
check_bogus_states c
end
test "Container running" do
- act_as_system_user do
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.output_path = "/tmp"
- c.save!
+ c, _ = minimal_new priority: 1
- c.reload
- c.state = "Complete"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ set_user_from_auth :dispatch1
+ check_illegal_updates c, [{state: Container::Running},
+ {state: Container::Complete}]
- c.reload
- c.state = "Running"
- c.save!
+ c.update_attributes! state: Container::Locked
+ c.update_attributes! state: Container::Running
- check_illegal_modify c
- check_bogus_states c
+ check_illegal_modify c
+ check_bogus_states c
- c.reload
- c.state = "Queued"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ check_illegal_updates c, [{state: Container::Queued}]
+ c.reload
- c.reload
- c.priority = 3
- c.save!
- end
+ c.update_attributes! priority: 3
end
- test "Container queued cancel" do
- act_as_system_user do
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.output_path = "/tmp"
- c.save!
+ test "Lock and unlock" do
+ c, cr = minimal_new priority: 0
- c.reload
- c.state = "Cancelled"
- c.save!
+ set_user_from_auth :dispatch1
+ assert_equal Container::Queued, c.state
- check_no_change_from_complete c
- end
- end
+ refute c.update_attributes(state: Container::Locked), "no priority"
+ c.reload
+ assert cr.update_attributes priority: 1
- test "Container running cancel" do
- act_as_system_user do
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.output_path = "/tmp"
- c.save!
+ refute c.update_attributes(state: Container::Running), "not locked"
+ c.reload
+ refute c.update_attributes(state: Container::Complete), "not locked"
+ c.reload
- c.reload
- c.state = "Running"
- c.save!
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ assert c.locked_by_uuid
+ assert c.auth_uuid
- c.reload
- c.state = "Cancelled"
- c.save!
+ assert c.update_attributes(state: Container::Queued), show_errors(c)
+ refute c.locked_by_uuid
+ refute c.auth_uuid
- check_no_change_from_complete c
- end
+ refute c.update_attributes(state: Container::Running), "not locked"
+ c.reload
+ refute c.locked_by_uuid
+ refute c.auth_uuid
+
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ assert c.update_attributes(state: Container::Running), show_errors(c)
+ assert c.locked_by_uuid
+ assert c.auth_uuid
+
+ auth_uuid_was = c.auth_uuid
+
+ refute c.update_attributes(state: Container::Locked), "already running"
+ c.reload
+ refute c.update_attributes(state: Container::Queued), "already running"
+ c.reload
+
+ assert c.update_attributes(state: Container::Complete), show_errors(c)
+ refute c.locked_by_uuid
+ refute c.auth_uuid
+
+ auth_exp = ApiClientAuthorization.find_by_uuid(auth_uuid_was).expires_at
+ assert_operator auth_exp, :<, db_current_time
+ end
+
+ test "Container queued cancel" do
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+ check_no_change_from_cancelled c
+ end
+
+ test "Container locked cancel" do
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+ check_no_change_from_cancelled c
+ end
+
+ test "Container running cancel" do
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.update_attributes! state: Container::Queued
+ c.update_attributes! state: Container::Locked
+ c.update_attributes! state: Container::Running
+ c.update_attributes! state: Container::Cancelled
+ check_no_change_from_cancelled c
end
test "Container create forbidden for non-admin" do
set_user_from_auth :active_trustedclient
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.cwd = "/tmp"
+ c = Container.new DEFAULT_ATTRS
c.environment = {}
c.mounts = {"BAR" => "FOO"}
c.output_path = "/tmp"
end
test "Container only set exit code on complete" do
- act_as_system_user do
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.output_path = "/tmp"
- c.save!
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.update_attributes! state: Container::Locked
+ c.update_attributes! state: Container::Running
- c.reload
- c.state = "Running"
- c.save!
+ check_illegal_updates c, [{exit_code: 1},
+ {exit_code: 1, state: Container::Cancelled}]
- c.reload
- c.exit_code = 1
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.exit_code = 1
- c.state = "Cancelled"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.exit_code = 1
- c.state = "Complete"
- c.save!
- end
+ assert c.update_attributes(exit_code: 1, state: Container::Complete)
end
end
}(sigChan)
// Run all queued containers
- runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+ runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
// Finished dispatching; interrupt any crunch jobs that are still running
for _, cmd := range runningCmds {
// Any errors encountered are logged but the program would continue to run (not exit).
// This is because, once one or more crunch jobs are running,
// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
- ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
+ ticker := time.NewTicker(pollInterval)
for {
select {
// Container data
type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
+ UUID string `json:"uuid"`
+ State string `json:"state"`
+ Priority int `json:"priority"`
+ LockedByUUID string `json:"locked_by_uuid"`
}
// ContainerList is a list of the containers from api
}
// Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
return
}
- for i := 0; i < len(containers.Items); i++ {
- log.Printf("About to run queued container %v", containers.Items[i].UUID)
+ for _, c := range containers.Items {
+ log.Printf("About to run queued container %v", c.UUID)
// Run the container
- go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+ waitGroup.Add(1)
+ go func(c Container) {
+ run(c.UUID, crunchRunCommand, pollInterval)
+ waitGroup.Done()
+ }(c)
}
}
+func updateState(uuid, newState string) error {
+ err := arv.Update("containers", uuid,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": newState}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+ }
+ return err
+}
+
// Run queued container:
-// Set container state to locked (TBD)
+// Set container state to Locked
// Run container using the given crunch-run command
// Set the container state to Running
// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
- cmd := exec.Command(crunchRunCommand, uuid)
+func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
+ if err := updateState(uuid, "Locked"); err != nil {
+ return
+ }
+ cmd := exec.Command(crunchRunCommand, uuid)
cmd.Stdin = nil
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stderr
+
+ // Add this crunch job to the list of runningCmds only if we
+ // succeed in starting crunch-run.
+ runningCmdsMutex.Lock()
if err := cmd.Start(); err != nil {
- log.Printf("Error running container for %v: %q", uuid, err)
+ log.Printf("Error starting crunch-run for %v: %q", uuid, err)
+ runningCmdsMutex.Unlock()
+ updateState(uuid, "Queued")
return
}
-
- // Add this crunch job to the list of runningCmds
- runningCmdsMutex.Lock()
runningCmds[uuid] = cmd
runningCmdsMutex.Unlock()
- log.Printf("Started container run for %v", uuid)
+ defer func() {
+ setFinalState(uuid)
- // Add this crunch job to waitGroup
- waitGroup.Add(1)
- defer waitGroup.Done()
+ // Remove the crunch job from runningCmds
+ runningCmdsMutex.Lock()
+ delete(runningCmds, uuid)
+ runningCmdsMutex.Unlock()
+ }()
- // Update container status to Running
- err := arv.Update("containers", uuid,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Running"}},
- nil)
- if err != nil {
- log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
- }
+ log.Printf("Starting container %v", uuid)
+
+ updateState(uuid, "Running")
+
+ cmdExited := make(chan struct{})
- // A goroutine to terminate the runner if container priority becomes zero
- priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+ // Kill the child process if container priority changes to zero
go func() {
- for _ = range priorityTicker.C {
+ ticker := time.NewTicker(pollInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-cmdExited:
+ return
+ case <-ticker.C:
+ }
var container Container
err := arv.Get("containers", uuid, nil, &container)
if err != nil {
- log.Printf("Error getting container info for %v: %q", uuid, err)
- } else {
- if container.Priority == 0 {
- priorityTicker.Stop()
- cmd.Process.Signal(os.Interrupt)
- }
+ log.Printf("Error getting container %v: %q", uuid, err)
+ continue
+ }
+ if container.Priority == 0 {
+ log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+ cmd.Process.Signal(os.Interrupt)
}
}
}()
- // Wait for the crunch job to exit
+ // Wait for crunch-run to exit
if _, err := cmd.Process.Wait(); err != nil {
log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
}
+ close(cmdExited)
- // Remove the crunch job to runningCmds
- runningCmdsMutex.Lock()
- delete(runningCmds, uuid)
- runningCmdsMutex.Unlock()
-
- priorityTicker.Stop()
+ log.Printf("Finished container run for %v", uuid)
+}
- // The container state should be 'Complete'
+func setFinalState(uuid string) {
+ // The container state should now be 'Complete' if everything
+ // went well. If it started but crunch-run didn't change its
+ // final state to 'Running', fix that now. If it never even
+ // started, cancel it as unrunnable. (TODO: Requeue instead,
+ // and fix tests so they can tell something happened even if
+ // the final state is Queued.)
var container Container
- err = arv.Get("containers", uuid, nil, &container)
- if container.State == "Running" {
- log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
- err = arv.Update("containers", uuid,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Complete"}},
- nil)
- if err != nil {
- log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
- }
+ err := arv.Get("containers", uuid, nil, &container)
+ if err != nil {
+ log.Printf("Error getting final container state: %v", err)
+ }
+ fixState := map[string]string{
+ "Running": "Complete",
+ "Locked": "Cancelled",
+ }
+ if newState, ok := fixState[container.State]; ok {
+ log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
+ updateState(uuid, newState)
}
-
- log.Printf("Finished container run for %v", uuid)
}
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "io/ioutil"
+ "bytes"
"log"
"net/http"
"net/http/httptest"
"os"
- "strings"
"syscall"
"testing"
"time"
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error updating container state")
+ testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state")
}
func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
- testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error running container for zzzzz-dz642-xxxxxxxxxxxxxx3")
+ testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting crunch-run for zzzzz-dz642-xxxxxxxxxxxxxx3")
}
func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
Retries: 0,
}
- tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
- c.Check(err, IsNil)
- defer os.Remove(tempfile.Name())
- log.SetOutput(tempfile)
+ buf := bytes.NewBuffer(nil)
+ log.SetOutput(buf)
+ defer log.SetOutput(os.Stderr)
go func() {
time.Sleep(2 * time.Second)
sigChan <- syscall.SIGTERM
}()
- runQueuedContainers(1, 1, crunchCmd)
+ runQueuedContainers(time.Second, time.Second, crunchCmd)
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
- buf, _ := ioutil.ReadFile(tempfile.Name())
- c.Check(strings.Contains(string(buf), expected), Equals, true)
+ c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
package main
import (
+ "bufio"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
return nil
}
+type apiClientAuthorization struct {
+ UUID string `json:"uuid"`
+ APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+ Items []apiClientAuthorization `json:"items"`
+}
+
// Poll for queued containers using pollInterval.
// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
//
// This is because, once one or more crunch jobs are running,
// we would need to wait for them complete.
func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
- ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+ var auth apiClientAuthorization
+ err := arv.Call("GET", "api_client_authorizations", "", "current", nil, &auth)
+ if err != nil {
+ log.Printf("Error getting my token UUID: %v", err)
+ return
+ }
+ ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
for {
select {
case <-ticker.C:
- dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
+ dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
case <-doneProcessing:
ticker.Stop()
return
State string `json:"state"`
Priority int `json:"priority"`
RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
+ LockedByUUID string `json:"locked_by_uuid"`
}
// ContainerList is a list of the containers from api
Items []Container `json:"items"`
}
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
+// Get the list of queued containers from API server and invoke run
+// for each container.
+func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
params := arvadosclient.Dict{
- "filters": [][]string{[]string{"state", "=", "Queued"}},
+ "filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
}
var containers ContainerList
return
}
- for i := 0; i < len(containers.Items); i++ {
- log.Printf("About to submit queued container %v", containers.Items[i].UUID)
- // Run the container
- go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
+ for _, container := range containers.Items {
+ if container.State == "Locked" {
+ if container.LockedByUUID != auth.UUID {
+ // Locked by a different dispatcher
+ continue
+ } else if checkMine(container.UUID) {
+ // I already have a goroutine running
+ // for this container: it just hasn't
+ // gotten past Locked state yet.
+ continue
+ }
+ log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
+ "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
+ container.UUID, auth.UUID)
+ setMine(container.UUID, true)
+ go func() {
+ waitContainer(container, pollInterval)
+ setMine(container.UUID, false)
+ }()
+ }
+ go run(container, crunchRunCommand, finishCommand, pollInterval)
}
}
// sbatchCmd
func sbatchFunc(container Container) *exec.Cmd {
- memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"]*1048576)))
+ memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
return exec.Command("sbatch", "--share", "--parsable",
"--job-name="+container.UUID,
"--mem-per-cpu="+strconv.Itoa(int(memPerCPU)),
func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
submitErr = nil
- // Mark record as complete if anything errors out.
defer func() {
- if submitErr != nil {
- // This really should be an "Error" state, see #8018
- updateErr := arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Complete"}},
- nil)
- if updateErr != nil {
- log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
- }
+ // If we didn't get as far as submitting a slurm job,
+ // unlock the container and return it to the queue.
+ if submitErr == nil {
+ // OK, no cleanup needed
+ return
+ }
+ err := arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Queued"}},
+ nil)
+ if err != nil {
+ log.Printf("Error unlocking container %s: %v", container.UUID, err)
}
}()
stdoutChan := make(chan []byte)
go func() {
b, _ := ioutil.ReadAll(stdoutReader)
+ stdoutReader.Close()
stdoutChan <- b
close(stdoutChan)
}()
stderrChan := make(chan []byte)
go func() {
b, _ := ioutil.ReadAll(stderrReader)
+ stderrReader.Close()
stderrChan <- b
close(stderrChan)
}()
err := cmd.Run()
if err != nil {
log.Printf("While setting up strigger: %v", err)
+ // BUG: we drop the error here and forget about it. A
+ // human has to notice the container is stuck in
+ // Running state, and fix it manually.
}
}
-// Run a queued container.
-// Set container state to locked (TBD)
-// Submit job to slurm to execute crunch-run command for the container
-// If the container priority becomes zero while crunch job is still running, cancel the job.
-func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
+// Run a queued container: [1] Set container state to locked. [2]
+// Execute crunch-run as a slurm batch job. [3] waitContainer().
+func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
+ setMine(container.UUID, true)
+ defer setMine(container.UUID, false)
+
+ // Update container status to Locked. This will fail if
+ // another dispatcher (token) has already locked it. It will
+ // succeed if *this* dispatcher has already locked it.
+ err := arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Locked"}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
+ return
+ }
+
+ log.Printf("About to submit queued container %v", container.UUID)
jobid, err := submit(container, crunchRunCommand)
if err != nil {
- log.Printf("Error queuing container run: %v", err)
+ log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
return
}
}
finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
- // Update container status to Running, this is a temporary workaround
- // to avoid resubmitting queued containers because record locking isn't
- // implemented yet.
+ // Update container status to Running. This will fail if
+ // another dispatcher (token) has already locked it. It will
+ // succeed if *this* dispatcher has already locked it.
err = arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": "Running"}},
if err != nil {
log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
}
+ log.Printf("Submitted container %v to slurm", container.UUID)
+ waitContainer(container, pollInterval)
+}
- log.Printf("Submitted container run for %v", container.UUID)
-
- containerUUID := container.UUID
+// Wait for a container to finish. Cancel the slurm job if the
+// container priority changes to zero before it ends.
+func waitContainer(container Container, pollInterval time.Duration) {
+ log.Printf("Monitoring container %v started", container.UUID)
+ defer log.Printf("Monitoring container %v finished", container.UUID)
+
+ pollTicker := time.NewTicker(pollInterval)
+ defer pollTicker.Stop()
+ for _ = range pollTicker.C {
+ var updated Container
+ err := arv.Get("containers", container.UUID, nil, &updated)
+ if err != nil {
+ log.Printf("Error getting container %s: %q", container.UUID, err)
+ continue
+ }
+ if updated.State == "Complete" || updated.State == "Cancelled" {
+ return
+ }
+ if updated.Priority != 0 {
+ continue
+ }
- // A goroutine to terminate the runner if container priority becomes zero
- priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
- go func() {
- for _ = range priorityTicker.C {
- var container Container
- err := arv.Get("containers", containerUUID, nil, &container)
- if err != nil {
- log.Printf("Error getting container info for %v: %q", container.UUID, err)
- } else {
- if container.Priority == 0 {
- log.Printf("Canceling container %v", container.UUID)
- priorityTicker.Stop()
- cancelcmd := exec.Command("scancel", "--name="+container.UUID)
- cancelcmd.Run()
- }
- if container.State == "Complete" {
- priorityTicker.Stop()
- }
+ // Priority is zero, but state is Running or Locked
+ log.Printf("Canceling container %s", container.UUID)
+
+ err = exec.Command("scancel", "--name="+container.UUID).Run()
+ if err != nil {
+ log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+ if inQ, err := checkSqueue(container.UUID); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ continue
+ } else if inQ {
+ log.Printf("Container %s is still in squeue; will retry", container.UUID)
+ continue
}
}
- }()
+ err = arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Cancelled"}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating state for container %s: %s", container.UUID, err)
+ continue
+ }
+
+ return
+ }
+}
+
+func checkSqueue(uuid string) (bool, error) {
+ cmd := exec.Command("squeue", "--format=%j")
+ sq, err := cmd.StdoutPipe()
+ if err != nil {
+ return false, err
+ }
+ cmd.Start()
+ defer cmd.Wait()
+ scanner := bufio.NewScanner(sq)
+ found := false
+ for scanner.Scan() {
+ if scanner.Text() == uuid {
+ found = true
+ }
+ }
+ if err := scanner.Err(); err != nil {
+ return false, err
+ }
+ return found, nil
+}
+
+var mineMutex sync.RWMutex
+var mineMap = make(map[string]bool)
+
+// Goroutine-safely add/remove uuid to the set of "my" containers,
+// i.e., ones for which this process has a goroutine running.
+func setMine(uuid string, t bool) {
+ mineMutex.Lock()
+ if t {
+ mineMap[uuid] = true
+ } else {
+ delete(mineMap, uuid)
+ }
+ mineMutex.Unlock()
+}
+
+// Check whether there is already a goroutine running for this
+// container.
+func checkMine(uuid string) bool {
+ mineMutex.RLocker().Lock()
+ defer mineMutex.RLocker().Unlock()
+ return mineMap[uuid]
}
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "bytes"
"fmt"
- "io/ioutil"
"log"
"math"
"net/http"
if err != nil {
c.Fatalf("Error making arvados client: %s", err)
}
+ os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
}
func (s *TestSuite) TearDownTest(c *C) {
}(sbatchCmd)
sbatchCmd = func(container Container) *exec.Cmd {
sbatchCmdLine = sbatchFunc(container).Args
- return exec.Command("echo", container.UUID)
+ return exec.Command("sh")
}
// Override striggerCmd
apiHost, apiToken, apiInsecure).Args
go func() {
time.Sleep(5 * time.Second)
- arv.Update("containers", containerUUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Complete"}},
- nil)
+ for _, state := range []string{"Running", "Complete"} {
+ arv.Update("containers", containerUUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": state}},
+ nil)
+ }
}()
return exec.Command("echo", "strigger")
}
c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
- "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h 1 zzzzz-dz642-queuedcontainer"})
+ "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
// There should be no queued containers now
err = arv.List("containers", params, &containers)
func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
+ apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
Retries: 0,
}
- tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
- c.Check(err, IsNil)
- defer os.Remove(tempfile.Name())
- log.SetOutput(tempfile)
+ buf := bytes.NewBuffer(nil)
+ log.SetOutput(buf)
+ defer log.SetOutput(os.Stderr)
go func() {
- time.Sleep(2 * time.Second)
+ for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+ time.Sleep(100 * time.Millisecond)
+ }
sigChan <- syscall.SIGTERM
}()
runQueuedContainers(2, 1, crunchCmd, crunchCmd)
- buf, _ := ioutil.ReadFile(tempfile.Name())
- c.Check(strings.Contains(string(buf), expected), Equals, true)
+ c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
}
return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
"touch": fmt.Sprintf("%d", time.Now()),
- })
+ }, nil)
}
// Mtime returns the last-modified property of a block blob.