Merge branch 'master' into 9161-node-state-fixes
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 17 May 2016 12:50:51 +0000 (08:50 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 17 May 2016 12:50:51 +0000 (08:50 -0400)
32 files changed:
build/run-build-packages-python-and-ruby.sh
sdk/cli/arvados-cli.gemspec
sdk/cli/bin/arv
sdk/go/arvadosclient/arvadosclient.go
sdk/go/arvadostest/fixtures.go
sdk/python/arvados/events.py
sdk/python/tests/test_events.py [moved from sdk/python/tests/test_websockets.py with 65% similarity]
sdk/ruby/arvados.gemspec
services/api/app/controllers/arvados/v1/api_client_authorizations_controller.rb
services/api/app/controllers/arvados/v1/containers_controller.rb
services/api/app/middlewares/arvados_api_token.rb
services/api/app/models/container.rb
services/api/app/models/container_request.rb
services/api/config/routes.rb
services/api/db/migrate/20160506175108_add_auths_to_container.rb [new file with mode: 0644]
services/api/db/migrate/20160509143250_add_auth_and_lock_to_container_index.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/lib/current_api_client.rb
services/api/lib/whitelist_update.rb
services/api/test/fixtures/api_client_authorizations.yml
services/api/test/fixtures/container_requests.yml [new file with mode: 0644]
services/api/test/fixtures/containers.yml
services/api/test/functional/arvados/v1/api_client_authorizations_controller_test.rb
services/api/test/functional/arvados/v1/containers_controller_test.rb [new file with mode: 0644]
services/api/test/test_helper.rb
services/api/test/unit/container_request_test.rb
services/api/test/unit/container_test.rb
services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/keepstore/azure_blob_volume.go

index 23ddc9a94f36b20e4ca615eb8d4e8784590ad5ec..13aa687316a5bbcca681e3fb1e9b6f7f0c0c9022 100755 (executable)
@@ -172,6 +172,12 @@ fi
 
 if [[ "$UPLOAD" != 0 ]]; then
 
+  if [[ $DEBUG > 0 ]]; then
+    EXTRA_UPLOAD_FLAGS=" --verbose"
+  else
+    EXTRA_UPLOAD_FLAGS=""
+  fi
+
   if [[ ! -e "$WORKSPACE/packages" ]]; then
     mkdir -p "$WORKSPACE/packages"
   fi
@@ -180,7 +186,7 @@ if [[ "$UPLOAD" != 0 ]]; then
   timer_reset
 
   if [ "$PYTHON_BUILD_FAILURES" -eq 0 ]; then
-    /usr/local/arvados-dev/jenkins/run_upload_packages.py -v --workspace $WORKSPACE python
+    /usr/local/arvados-dev/jenkins/run_upload_packages.py $EXTRA_UPLOAD_FLAGS --workspace $WORKSPACE python
   else
     echo "Skipping python packages upload, there were errors building the packages"
   fi
@@ -191,7 +197,7 @@ if [[ "$UPLOAD" != 0 ]]; then
   timer_reset
 
   if [ "$GEM_BUILD_FAILURES" -eq 0 ]; then
-    /usr/local/arvados-dev/jenkins/run_upload_packages.py -v --workspace $WORKSPACE gems
+    /usr/local/arvados-dev/jenkins/run_upload_packages.py $EXTRA_UPLOAD_FLAGS --workspace $WORKSPACE gems
   else
     echo "Skipping ruby gem upload, there were errors building the packages"
   fi
index 7755a5add2273baf4d4e2efbb140f9f5aa1311da..0e11f9639d1aa2b233bffe08356986b1a0e5fe0b 100644 (file)
@@ -25,7 +25,9 @@ Gem::Specification.new do |s|
   s.executables << "arv-tag"
   s.required_ruby_version = '>= 2.1.0'
   s.add_runtime_dependency 'arvados', '~> 0.1', '>= 0.1.20150128223554'
-  s.add_runtime_dependency 'google-api-client', '~> 0.6', '>= 0.6.3', '<0.9'
+  # Our google-api-client dependency used to be < 0.9, but that could be
+  # satisfied by the buggy 0.9.pre*.  https://dev.arvados.org/issues/9213
+  s.add_runtime_dependency 'google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
   s.add_runtime_dependency 'activesupport', '~> 3.2', '>= 3.2.13'
   s.add_runtime_dependency 'json', '~> 1.7', '>= 1.7.7'
   s.add_runtime_dependency 'trollop', '~> 2.0'
index 541bcdc7594faf930cc0f22de02ede3189260c53..b377ffed63796b9f5a3b83d98f4799a0beeae3d8 100755 (executable)
@@ -14,24 +14,33 @@ if RUBY_VERSION < '1.9.3' then
 end
 
 begin
-  require 'curb'
-  require 'rubygems'
-  require 'arvados/google_api_client'
   require 'json'
+  require 'net/http'
   require 'pp'
-  require 'trollop'
+  require 'tempfile'
+  require 'yaml'
+rescue LoadError => error
+  abort "Error loading libraries: #{error}\n"
+end
+
+begin
+  require 'rubygems'
+  # Load the gems with more requirements first, so we respect any version
+  # constraints they put on gems loaded later.
+  require 'arvados/google_api_client'
+  require 'active_support/inflector'
   require 'andand'
+  require 'curb'
   require 'oj'
-  require 'active_support/inflector'
-  require 'yaml'
-  require 'tempfile'
-  require 'net/http'
-rescue LoadError
+  require 'trollop'
+rescue LoadError => error
   abort <<-EOS
 
+Error loading gems: #{error}
+
 Please install all required gems:
 
-  gem install activesupport andand curb google-api-client json oj trollop yaml
+  gem install arvados activesupport andand curb json oj trollop
 
   EOS
 end
index b67eaa59a6749fb7e9b1a3da5ad2617344fc799d..8cdfa484bd96df3f479c7a40c8bb6692eddc1da5 100644 (file)
@@ -273,7 +273,7 @@ func newAPIServerError(ServerAddress string, resp *http.Response) APIServerError
 // Returns a non-nil error if an error occurs making the API call, the
 // API responds with a non-successful HTTP status, or an error occurs
 // parsing the response body.
-func (c ArvadosClient) Call(method string, resourceType string, uuid string, action string, parameters Dict, output interface{}) error {
+func (c ArvadosClient) Call(method, resourceType, uuid, action string, parameters Dict, output interface{}) error {
        reader, err := c.CallRaw(method, resourceType, uuid, action, parameters)
        if reader != nil {
                defer reader.Close()
index bebef79074ebb1758f3f5298502f235a8a02f1e1..84a3bff06c0f09e3d326925d89b30ef4deaf0804 100644 (file)
@@ -13,6 +13,9 @@ const (
        FooBarDirCollection   = "zzzzz-4zz18-foonbarfilesdir"
        FooPdh                = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
        HelloWorldPdh         = "55713e6a34081eb03609e7ad5fcad129+62"
+
+       Dispatch1Token    = "kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw"
+       Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr"
 )
 
 // A valid manifest designed to test various edge cases and parsing
index d88897f1234329b5294bedb1da4e4104f9720b4e..81a9b36182a8545adbdcd3fd6afec7f0fba53602 100644 (file)
@@ -83,14 +83,18 @@ class EventClient(object):
             self.filters = [[]]
         self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
-        self.is_closed = False
-        self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+        self.is_closed = threading.Event()
+        self._setup_event_client()
 
-    def connect(self):
-        self.ec.connect()
-
-    def close_connection(self):
-        self.ec.close_connection()
+    def _setup_event_client(self):
+        self.ec = _EventClient(self.url, self.filters, self.on_event,
+                               self.last_log_id, self.on_closed)
+        self.ec.daemon = True
+        try:
+            self.ec.connect()
+        except Exception:
+            self.ec.close_connection()
+            raise
 
     def subscribe(self, f, last_log_id=None):
         self.filters.append(f)
@@ -101,7 +105,7 @@ class EventClient(object):
         self.ec.unsubscribe(f)
 
     def close(self, code=1000, reason='', timeout=0):
-        self.is_closed = True
+        self.is_closed.set()
         self.ec.close(code, reason, timeout)
 
     def on_event(self, m):
@@ -114,21 +118,25 @@ class EventClient(object):
             thread.interrupt_main()
 
     def on_closed(self):
-        if self.is_closed == False:
+        if not self.is_closed.is_set():
             _logger.warn("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
-                    self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
-                    self.ec.connect()
+                    self._setup_event_client()
                     break
                 except Exception as e:
                     _logger.warn("Error '%s' during websocket reconnect.", e)
             if tries_left == 0:
                 _logger.exception("EventClient thread could not contact websocket server.")
-                self.is_closed = True
+                self.is_closed.set()
                 thread.interrupt_main()
                 return
 
+    def run_forever(self):
+        # Have to poll here to let KeyboardInterrupt get raised.
+        while not self.is_closed.wait(1):
+            pass
+
 
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):
@@ -250,20 +258,14 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
     if not endpoint:
         raise errors.FeatureNotEnabledError(
             "Server does not advertise a websocket endpoint")
+    uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
     try:
-        uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
         client = EventClient(uri_with_token, filters, on_event, last_log_id)
-        ok = False
-        try:
-            client.connect()
-            ok = True
-            return client
-        finally:
-            if not ok:
-                client.close_connection()
-    except:
+    except Exception:
         _logger.warn("Failed to connect to websockets on %s" % endpoint)
         raise
+    else:
+        return client
 
 
 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
similarity index 65%
rename from sdk/python/tests/test_websockets.py
rename to sdk/python/tests/test_events.py
index d122a1cf42570a71b9a953f2a15636b6cd50ee33..f2cdba28c775a523bc178052644cb4a76dac2771 100644 (file)
@@ -1,23 +1,21 @@
 import arvados
-import arvados.events
-import arvados.errors
-from datetime import datetime, timedelta, tzinfo
+import io
 import logging
-import logging.handlers
 import mock
 import Queue
 import run_test_server
-import StringIO
-import tempfile
 import threading
 import time
 import unittest
 
+import arvados_testutil
+
 class WebsocketTest(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
 
     TIME_PAST = time.time()-3600
     TIME_FUTURE = time.time()+3600
+    MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
 
     def setUp(self):
         self.ws = None
@@ -153,7 +151,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         run_test_server.authorize_with('active')
         events = Queue.Queue(100)
 
-        logstream = StringIO.StringIO()
+        logstream = io.BytesIO()
         rootLogger = logging.getLogger()
         streamHandler = logging.StreamHandler(logstream)
         rootLogger.addHandler(streamHandler)
@@ -178,7 +176,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
         # close (im)properly
         if close_unexpected:
-            self.ws.close_connection()
+            self.ws.ec.close_connection()
         else:
             self.ws.close()
 
@@ -221,7 +219,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
     def test_websocket_reconnect_retry(self, event_client_connect):
         event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
 
-        logstream = StringIO.StringIO()
+        logstream = io.BytesIO()
         rootLogger = logging.getLogger()
         streamHandler = logging.StreamHandler(logstream)
         rootLogger.addHandler(streamHandler)
@@ -245,3 +243,130 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
         self.assertNotEqual(found, -1)
         rootLogger.removeHandler(streamHandler)
+
+    @mock.patch('arvados.events._EventClient')
+    def test_subscribe_method(self, websocket_client):
+        filters = [['object_uuid', 'is_a', 'arvados#human']]
+        client = arvados.events.EventClient(
+            self.MOCK_WS_URL, [], lambda event: None, None)
+        client.subscribe(filters[:], 99)
+        websocket_client().subscribe.assert_called_with(filters, 99)
+
+    @mock.patch('arvados.events._EventClient')
+    def test_unsubscribe(self, websocket_client):
+        filters = [['object_uuid', 'is_a', 'arvados#human']]
+        client = arvados.events.EventClient(
+            self.MOCK_WS_URL, filters[:], lambda event: None, None)
+        client.unsubscribe(filters[:])
+        websocket_client().unsubscribe.assert_called_with(filters)
+
+    @mock.patch('arvados.events._EventClient')
+    def test_run_forever_survives_reconnects(self, websocket_client):
+        connection_cond = threading.Condition()
+        def ws_connect():
+            with connection_cond:
+                connection_cond.notify_all()
+        websocket_client().connect.side_effect = ws_connect
+        client = arvados.events.EventClient(
+            self.MOCK_WS_URL, [], lambda event: None, None)
+        with connection_cond:
+            forever_thread = threading.Thread(target=client.run_forever)
+            forever_thread.start()
+            # Simulate an unexpected disconnect, and wait for reconnect.
+            close_thread = threading.Thread(target=client.on_closed)
+            close_thread.start()
+            connection_cond.wait()
+        close_thread.join()
+        run_forever_alive = forever_thread.is_alive()
+        client.close()
+        forever_thread.join()
+        self.assertTrue(run_forever_alive)
+        self.assertEqual(2, websocket_client().connect.call_count)
+
+
+class PollClientTestCase(unittest.TestCase):
+    class MockLogs(object):
+        def __init__(self):
+            self.logs = []
+            self.lock = threading.Lock()
+
+        def add(self, log):
+            with self.lock:
+                self.logs.append(log)
+
+        def return_list(self, num_retries=None):
+            with self.lock:
+                retval = self.logs
+                self.logs = []
+            return {'items': retval, 'items_available': len(retval)}
+
+
+    def setUp(self):
+        self.logs = self.MockLogs()
+        self.arv = mock.MagicMock(name='arvados.api()')
+        self.arv.logs().list().execute.side_effect = self.logs.return_list
+        self.callback_cond = threading.Condition()
+        self.recv_events = []
+
+    def tearDown(self):
+        if hasattr(self, 'client'):
+            self.client.close(timeout=None)
+
+    def callback(self, event):
+        with self.callback_cond:
+            self.recv_events.append(event)
+            self.callback_cond.notify_all()
+
+    def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
+        if filters is None:
+            filters = []
+        if callback is None:
+            callback = self.callback
+        self.client = arvados.events.PollClient(
+            self.arv, filters, callback, poll_time, last_log_id)
+
+    def was_filter_used(self, target):
+        return any(target in call[-1].get('filters', [])
+                   for call in self.arv.logs().list.call_args_list)
+
+    def test_callback(self):
+        test_log = {'id': 12345, 'testkey': 'testtext'}
+        self.logs.add({'id': 123})
+        self.build_client(poll_time=.01)
+        with self.callback_cond:
+            self.client.start()
+            self.callback_cond.wait()
+            self.logs.add(test_log.copy())
+            self.callback_cond.wait()
+        self.client.close(timeout=None)
+        self.assertIn(test_log, self.recv_events)
+
+    def test_subscribe(self):
+        client_filter = ['kind', '=', 'arvados#test']
+        self.build_client()
+        self.client.subscribe([client_filter[:]])
+        with self.callback_cond:
+            self.client.start()
+            self.callback_cond.wait()
+        self.client.close(timeout=None)
+        self.assertTrue(self.was_filter_used(client_filter))
+
+    def test_unsubscribe(self):
+        client_filter = ['kind', '=', 'arvados#test']
+        self.build_client()
+        self.client.subscribe([client_filter[:]])
+        self.client.unsubscribe([client_filter[:]])
+        self.client.start()
+        self.client.close(timeout=None)
+        self.assertFalse(self.was_filter_used(client_filter))
+
+    def test_run_forever(self):
+        self.build_client()
+        with self.callback_cond:
+            self.client.start()
+            forever_thread = threading.Thread(target=self.client.run_forever)
+            forever_thread.start()
+            self.callback_cond.wait()
+        self.assertTrue(forever_thread.is_alive())
+        self.client.close()
+        forever_thread.join()
index 355ed5d2d814b9542a6fca773a214de0e9af1292..2c4e60eeb8f1e664e2a4ed62879381fbb75d6e01 100644 (file)
@@ -22,7 +22,9 @@ Gem::Specification.new do |s|
   # activesupport <4.2.6 only because https://dev.arvados.org/issues/8222
   s.add_dependency('activesupport', '>= 3', '< 4.2.6')
   s.add_dependency('andand', '~> 1.3', '>= 1.3.3')
-  s.add_dependency('google-api-client', '>= 0.7', '< 0.9')
+  # Our google-api-client dependency used to be < 0.9, but that could be
+  # satisfied by the buggy 0.9.pre*.  https://dev.arvados.org/issues/9213
+  s.add_dependency('google-api-client', '>= 0.7', '< 0.8.9')
   # work around undeclared dependency on i18n in some activesupport 3.x.x:
   s.add_dependency('i18n', '~> 0')
   s.add_dependency('json', '~> 1.7', '>= 1.7.7')
index 5229d80b0c9ce530bcb62bebc4f8318a21c633e7..76acc701fd30194972b1f5491659f76ae8f7862e 100644 (file)
@@ -1,8 +1,9 @@
 class Arvados::V1::ApiClientAuthorizationsController < ApplicationController
   accept_attribute_as_json :scopes, Array
-  before_filter :current_api_client_is_trusted
+  before_filter :current_api_client_is_trusted, :except => [:current]
   before_filter :admin_required, :only => :create_system_auth
-  skip_before_filter :render_404_if_no_object, :only => :create_system_auth
+  skip_before_filter :render_404_if_no_object, :only => [:create_system_auth, :current]
+  skip_before_filter :find_object_by_uuid, :only => [:create_system_auth, :current]
 
   def self._create_system_auth_requires_parameters
     {
@@ -40,6 +41,11 @@ class Arvados::V1::ApiClientAuthorizationsController < ApplicationController
     super
   end
 
+  def current
+    @object = Thread.current[:api_client_authorization]
+    show
+  end
+
   protected
 
   def default_orders
index 04a5ed0cb2b660b9bd71b214b6ce74dbde9e4ccd..21ee7efa53b5d4008c0f42717095194b9b0c39c6 100644 (file)
@@ -4,4 +4,19 @@ class Arvados::V1::ContainersController < ApplicationController
   accept_attribute_as_json :runtime_constraints, Hash
   accept_attribute_as_json :command, Array
 
+  def auth
+    if @object.locked_by_uuid != Thread.current[:api_client_authorization].uuid
+      raise ArvadosModel::PermissionDeniedError.new("Not locked by your token")
+    end
+    @object = @object.auth
+    show
+  end
+
+  # Updates use row locking to resolve races between multiple
+  # dispatchers trying to lock the same container.
+  def update
+    @object.with_lock do
+      super
+    end
+  end
 end
index 57d3ad02d748d7ee41540c0320ef8f068a8d375a..d8c04a1adbfcd0512bdbf38a4225081709ca2de8 100644 (file)
@@ -31,7 +31,7 @@ class ArvadosApiToken
     supplied_token =
       params["api_token"] ||
       params["oauth_token"] ||
-      env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-z0-9]+)/).andand[1]
+      env["HTTP_AUTHORIZATION"].andand.match(/OAuth2 ([a-zA-Z0-9]+)/).andand[1]
     if supplied_token
       api_client_auth = ApiClientAuthorization.
         includes(:api_client, :user).
index 787047df68e877bc8944b3c23186cc400c3ae227..4c770083786934abdafe9461f51ee03646396415 100644 (file)
@@ -16,9 +16,12 @@ class Container < ArvadosModel
   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
   validate :validate_state_change
   validate :validate_change
+  validate :validate_lock
+  after_validation :assign_auth
   after_save :handle_completed
 
   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
+  belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
 
   api_accessible :user, extend: :common do |t|
     t.add :command
@@ -27,6 +30,7 @@ class Container < ArvadosModel
     t.add :environment
     t.add :exit_code
     t.add :finished_at
+    t.add :locked_by_uuid
     t.add :log
     t.add :mounts
     t.add :output
@@ -36,12 +40,14 @@ class Container < ArvadosModel
     t.add :runtime_constraints
     t.add :started_at
     t.add :state
+    t.add :auth_uuid
   end
 
   # Supported states for a container
   States =
     [
      (Queued = 'Queued'),
+     (Locked = 'Locked'),
      (Running = 'Running'),
      (Complete = 'Complete'),
      (Cancelled = 'Cancelled')
@@ -49,7 +55,8 @@ class Container < ArvadosModel
 
   State_transitions = {
     nil => [Queued],
-    Queued => [Running, Cancelled],
+    Queued => [Locked, Cancelled],
+    Locked => [Queued, Running, Cancelled],
     Running => [Complete, Cancelled]
   }
 
@@ -58,16 +65,13 @@ class Container < ArvadosModel
   end
 
   def update_priority!
-    if [Queued, Running].include? self.state
+    if [Queued, Locked, Running].include? self.state
       # Update the priority of this container to the maximum priority of any of
       # its committed container requests and save the record.
-      max = 0
-      ContainerRequest.where(container_uuid: uuid).each do |cr|
-        if cr.state == ContainerRequest::Committed and cr.priority > max
-          max = cr.priority
-        end
-      end
-      self.priority = max
+      self.priority = ContainerRequest.
+        where(container_uuid: uuid,
+              state: ContainerRequest::Committed).
+        maximum('priority')
       self.save!
     end
   end
@@ -102,52 +106,100 @@ class Container < ArvadosModel
   end
 
   def validate_change
-    permitted = []
+    permitted = [:state]
 
     if self.new_record?
-      permitted.push :owner_uuid, :command, :container_image, :cwd, :environment,
-                     :mounts, :output_path, :priority, :runtime_constraints, :state
+      permitted.push(:owner_uuid, :command, :container_image, :cwd,
+                     :environment, :mounts, :output_path, :priority,
+                     :runtime_constraints)
     end
 
     case self.state
-    when Queued
-      # permit priority change only.
+    when Queued, Locked
       permitted.push :priority
 
     when Running
+      permitted.push :priority, :progress
       if self.state_changed?
-        # At point of state change, can set state and started_at
-        permitted.push :state, :started_at
-      else
-        # While running, can update priority and progress.
-        permitted.push :priority, :progress
+        permitted.push :started_at
       end
 
     when Complete
-      if self.state_changed?
-        permitted.push :state, :finished_at, :output, :log, :exit_code
-      else
-        errors.add :state, "cannot update record"
+      if self.state_was == Running
+        permitted.push :finished_at, :output, :log, :exit_code
       end
 
     when Cancelled
-      if self.state_changed?
-        if self.state_was == Running
-          permitted.push :state, :finished_at, :output, :log
-        elsif self.state_was == Queued
-          permitted.push :state, :finished_at
-        end
-      else
-        errors.add :state, "cannot update record"
+      case self.state_was
+      when Running
+        permitted.push :finished_at, :output, :log
+      when Queued, Locked
+        permitted.push :finished_at
       end
 
     else
-      errors.add :state, "invalid state"
+      # The state_transitions check will add an error message for this
+      return false
     end
 
     check_update_whitelist permitted
   end
 
+  def validate_lock
+    # If the Container is already locked by someone other than the
+    # current api_client_auth, disallow all changes -- except
+    # priority, which needs to change to reflect max(priority) of
+    # relevant ContainerRequests.
+    if locked_by_uuid_was
+      if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
+        check_update_whitelist [:priority]
+      end
+    end
+
+    if [Locked, Running].include? self.state
+      # If the Container was already locked, locked_by_uuid must not
+      # changes. Otherwise, the current auth gets the lock.
+      need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
+    else
+      need_lock = nil
+    end
+
+    # The caller can provide a new value for locked_by_uuid, but only
+    # if it's exactly what we expect. This allows a caller to perform
+    # an update like {"state":"Unlocked","locked_by_uuid":null}.
+    if self.locked_by_uuid_changed?
+      if self.locked_by_uuid != need_lock
+        return errors.add :locked_by_uuid, "can only change to #{need_lock}"
+      end
+    end
+    self.locked_by_uuid = need_lock
+  end
+
+  def assign_auth
+    if self.auth_uuid_changed?
+      return errors.add :auth_uuid, 'is readonly'
+    end
+    if not [Locked, Running].include? self.state
+      # don't need one
+      self.auth.andand.update_attributes(expires_at: db_current_time)
+      self.auth = nil
+      return
+    elsif self.auth
+      # already have one
+      return
+    end
+    cr = ContainerRequest.
+      where('container_uuid=? and priority>0', self.uuid).
+      order('priority desc').
+      first
+    if !cr
+      return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
+    end
+    self.auth = ApiClientAuthorization.
+      create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
+              api_client_id: 0)
+  end
+
   def handle_completed
     # This container is finished so finalize any associated container requests
     # that are associated with this container.
index acb751c89401424e04d03b950d5acc675dceaca8..6353132e908baa3d683ec0a9d320ff3a60d55804 100644 (file)
@@ -78,32 +78,32 @@ class ContainerRequest < ArvadosModel
     self.cwd ||= "."
   end
 
-  # Turn a container request into a container.
+  # Create a new container (or find an existing one) to satisfy this
+  # request.
   def resolve
-    # In the future this will do things like resolve symbolic git and keep
-    # references to content addresses.
-    Container.create!({ :command => self.command,
-                        :container_image => self.container_image,
-                        :cwd => self.cwd,
-                        :environment => self.environment,
-                        :mounts => self.mounts,
-                        :output_path => self.output_path,
-                        :runtime_constraints => self.runtime_constraints })
+    # TODO: resolve symbolic git and keep references to content
+    # addresses.
+    c = act_as_system_user do
+      Container.create!(command: self.command,
+                        container_image: self.container_image,
+                        cwd: self.cwd,
+                        environment: self.environment,
+                        mounts: self.mounts,
+                        output_path: self.output_path,
+                        runtime_constraints: self.runtime_constraints)
+    end
+    self.container_uuid = c.uuid
   end
 
   def set_container
-    if self.container_uuid_changed?
-      if not current_user.andand.is_admin and not self.container_uuid.nil?
-        errors.add :container_uuid, "can only be updated to nil."
-      end
-    else
-      if self.state_changed?
-        if self.state == Committed and (self.state_was == Uncommitted or self.state_was.nil?)
-          act_as_system_user do
-            self.container_uuid = self.resolve.andand.uuid
-          end
-        end
-      end
+    if (container_uuid_changed? and
+        not current_user.andand.is_admin and
+        not container_uuid.nil?)
+      errors.add :container_uuid, "can only be updated to nil."
+      return false
+    end
+    if state_changed? and state == Committed and container_uuid.nil?
+      resolve
     end
   end
 
@@ -158,16 +158,14 @@ class ContainerRequest < ArvadosModel
   end
 
   def update_priority
-    if [Committed, Final].include? self.state and (self.state_changed? or
-                                                   self.priority_changed? or
-                                                   self.container_uuid_changed?)
-      [self.container_uuid_was, self.container_uuid].each do |cuuid|
-        unless cuuid.nil?
-          c = Container.find_by_uuid cuuid
-          act_as_system_user do
-            c.update_priority!
-          end
-        end
+    if self.state_changed? or
+        self.priority_changed? or
+        self.container_uuid_changed?
+      act_as_system_user do
+        Container.
+          where('uuid in (?)',
+                [self.container_uuid_was, self.container_uuid].compact).
+          map(&:update_priority!)
       end
     end
   end
index c85a3fc57af20d2461516c384623a452a86dbdda..ed8f8d89af9c2d429dfe661633df2ec36a0484e9 100644 (file)
@@ -15,6 +15,7 @@ Server::Application.routes.draw do
     namespace :v1 do
       resources :api_client_authorizations do
         post 'create_system_auth', on: :collection
+        get 'current', on: :collection
       end
       resources :api_clients
       resources :authorized_keys
@@ -28,7 +29,9 @@ Server::Application.routes.draw do
       end
       resources :humans
       resources :job_tasks
-      resources :containers
+      resources :containers do
+        get 'auth', on: :member
+      end
       resources :container_requests
       resources :jobs do
         get 'queue', on: :collection
diff --git a/services/api/db/migrate/20160506175108_add_auths_to_container.rb b/services/api/db/migrate/20160506175108_add_auths_to_container.rb
new file mode 100644 (file)
index 0000000..d714a49
--- /dev/null
@@ -0,0 +1,6 @@
+class AddAuthsToContainer < ActiveRecord::Migration
+  def change
+    add_column :containers, :auth_uuid, :string
+    add_column :containers, :locked_by_uuid, :string
+  end
+end
diff --git a/services/api/db/migrate/20160509143250_add_auth_and_lock_to_container_index.rb b/services/api/db/migrate/20160509143250_add_auth_and_lock_to_container_index.rb
new file mode 100644 (file)
index 0000000..4329ac0
--- /dev/null
@@ -0,0 +1,19 @@
+class AddAuthAndLockToContainerIndex < ActiveRecord::Migration
+  Columns_were = ["uuid", "owner_uuid", "modified_by_client_uuid", "modified_by_user_uuid", "state", "log", "cwd", "output_path", "output", "container_image"]
+  Columns = Columns_were + ["auth_uuid", "locked_by_uuid"]
+  def up
+    begin
+      remove_index :containers, :name => 'containers_search_index'
+    rescue
+    end
+    add_index(:containers, Columns, name: "containers_search_index")
+  end
+
+  def down
+    begin
+      remove_index :containers, :name => 'containers_search_index'
+    rescue
+    end
+    add_index(:containers, Columns_were, name: "containers_search_index")
+  end
+end
index 3ec420c1ddcf1d3ac84991359e02bb03ce1b10e9..4bf4a173bd9d1c31e04d0f7517c1927baf9f3ff2 100644 (file)
@@ -339,7 +339,9 @@ CREATE TABLE containers (
     progress double precision,
     priority integer,
     updated_at timestamp without time zone NOT NULL,
-    exit_code integer
+    exit_code integer,
+    auth_uuid character varying(255),
+    locked_by_uuid character varying(255)
 );
 
 
@@ -1472,7 +1474,7 @@ CREATE INDEX container_requests_search_index ON container_requests USING btree (
 -- Name: containers_search_index; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
 
-CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, state, log, cwd, output_path, output, container_image);
+CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, state, log, cwd, output_path, output, container_image, auth_uuid, locked_by_uuid);
 
 
 --
@@ -2583,4 +2585,8 @@ INSERT INTO schema_migrations (version) VALUES ('20160208210629');
 
 INSERT INTO schema_migrations (version) VALUES ('20160209155729');
 
-INSERT INTO schema_migrations (version) VALUES ('20160324144017');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20160324144017');
+
+INSERT INTO schema_migrations (version) VALUES ('20160506175108');
+
+INSERT INTO schema_migrations (version) VALUES ('20160509143250');
\ No newline at end of file
index 2e78612fc2d8b0ea3883cbb964d73c3e443e9c21..fbd4ef5f0c67933a7cc703d9f532c94fd601fc3d 100644 (file)
@@ -124,12 +124,18 @@ module CurrentApiClient
   end
 
   def act_as_user user
+    #auth_was = Thread.current[:api_client_authorization]
     user_was = Thread.current[:user]
     Thread.current[:user] = user
+    #Thread.current[:api_client_authorization] = ApiClientAuthorization.
+    #  where('user_id=? and scopes is null', user.id).
+    #  order('expires_at desc').
+    #  first
     begin
       yield
     ensure
       Thread.current[:user] = user_was
+      #Thread.current[:api_client_authorization] = auth_was
     end
   end
 
index a81f9924f01aa182bf35efc2e48dd326b3b20942..8fccd0f45c36416e72034a06e8e3ce3880f7aa04 100644 (file)
@@ -2,7 +2,7 @@ module WhitelistUpdate
   def check_update_whitelist permitted_fields
     attribute_names.each do |field|
       if not permitted_fields.include? field.to_sym and self.send((field.to_s + "_changed?").to_sym)
-        errors.add field, "illegal update of field"
+        errors.add field, "cannot be modified in this state"
       end
     end
   end
@@ -10,7 +10,7 @@ module WhitelistUpdate
   def validate_state_change
     if self.state_changed?
       unless state_transitions[self.state_was].andand.include? self.state
-        errors.add :state, "invalid state change from #{self.state_was} to #{self.state}"
+        errors.add :state, "cannot change from #{self.state_was} to #{self.state}"
         return false
       end
     end
index f99a9fb941f1b26f44d2d4b4035a28afd84fbc08..d6c0e40e7cc00a66dd0b93b3b86b36ef9ad770a0 100644 (file)
@@ -271,3 +271,9 @@ fuse:
   api_token: 4nagbkv8eap0uok7pxm72nossq5asihls3yn5p4xmvqx5t5e7p
   expires_at: 2038-01-01 00:00:00
 
+dispatch1:
+  uuid: zzzzz-gj3su-k9dvestay1plssr
+  api_client: untrusted
+  user: system_user
+  api_token: kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw
+  expires_at: 2038-01-01 00:00:00
\ No newline at end of file
diff --git a/services/api/test/fixtures/container_requests.yml b/services/api/test/fixtures/container_requests.yml
new file mode 100644 (file)
index 0000000..c9f3427
--- /dev/null
@@ -0,0 +1,13 @@
+queued:
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  state: Committed
+  priority: 1
+  created_at: 2016-01-11 11:11:11.111111111 Z
+  updated_at: 2016-01-11 11:11:11.111111111 Z
+  modified_at: 2016-01-11 11:11:11.111111111 Z
+  modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  container_image: test
+  cwd: test
+  output_path: test
+  command: ["echo", "hello"]
+  container_uuid: zzzzz-dz642-queuedcontainer
index aa7ad314ff0c8712de4f0c31692cb0d304768cfd..b804c804a4c25b17505722ba341b4bd11c6e5269 100644 (file)
@@ -1,6 +1,6 @@
 queued:
   uuid: zzzzz-dz642-queuedcontainer
-  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  owner_uuid: zzzzz-tpzed-000000000000000
   state: Queued
   priority: 1
   created_at: 2016-01-11 11:11:11.111111111 Z
@@ -16,7 +16,7 @@ queued:
 
 completed:
   uuid: zzzzz-dz642-compltcontainer
-  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  owner_uuid: zzzzz-tpzed-000000000000000
   state: Complete
   priority: 1
   created_at: 2016-01-11 11:11:11.111111111 Z
index 9f0f555d55eb8963f8f13eb74ef9536d42694274..37e690e0b21ccb221454d6a46ac04b4e732c2ce7 100644 (file)
@@ -168,4 +168,17 @@ class Arvados::V1::ApiClientAuthorizationsControllerTest < ActionController::Tes
     }
     assert_response 403
   end
+
+  test "get current token" do
+    authorize_with :active
+    get :current
+    assert_response :success
+    assert_equal(json_response['api_token'],
+                 api_client_authorizations(:active).api_token)
+  end
+
+  test "get current token, no auth" do
+    get :current
+    assert_response 401
+  end
 end
diff --git a/services/api/test/functional/arvados/v1/containers_controller_test.rb b/services/api/test/functional/arvados/v1/containers_controller_test.rb
new file mode 100644 (file)
index 0000000..d9f7d96
--- /dev/null
@@ -0,0 +1,52 @@
+require 'test_helper'
+
+class Arvados::V1::ContainersControllerTest < ActionController::TestCase
+  test 'create' do
+    authorize_with :system_user
+    post :create, {
+      container: {
+        command: ['echo', 'hello'],
+        container_image: 'test',
+        output_path: 'test',
+      },
+    }
+    assert_response :success
+  end
+
+  [Container::Queued, Container::Complete].each do |state|
+    test "cannot get auth in #{state} state" do
+      authorize_with :dispatch1
+      get :auth, id: containers(:queued).uuid
+      assert_response 403
+    end
+  end
+
+  test 'cannot get auth with wrong token' do
+    authorize_with :dispatch1
+    c = containers(:queued)
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+
+    authorize_with :system_user
+    get :auth, id: c.uuid
+    assert_response 403
+  end
+
+  test 'get auth' do
+    authorize_with :dispatch1
+    c = containers(:queued)
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+    get :auth, id: c.uuid
+    assert_response :success
+    assert_operator 32, :<, json_response['api_token'].length
+    assert_equal 'arvados#apiClientAuthorization', json_response['kind']
+  end
+
+  test 'no auth in container response' do
+    authorize_with :dispatch1
+    c = containers(:queued)
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+    get :show, id: c.uuid
+    assert_response :success
+    assert_nil json_response['auth']
+  end
+end
index 25ab286a23099f4125aaf1cce65a24450804e051..ef08c726ae2432481409fd054b12122fb4b7ce25 100644 (file)
@@ -36,6 +36,10 @@ module ArvadosTestSupport
   def auth(api_client_auth_name)
     {'HTTP_AUTHORIZATION' => "OAuth2 #{api_token(api_client_auth_name)}"}
   end
+
+  def show_errors model
+    return lambda { model.errors.full_messages.inspect }
+  end
 end
 
 class ActiveSupport::TestCase
index d0def576c3b6450b633cf08003ba5bf4ea296e2f..701147cf6576997c04bf633650b0770c193136ee 100644 (file)
@@ -306,18 +306,18 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal "Committed", cr.state
 
     c = Container.find_by_uuid cr.container_uuid
-    assert_equal "Queued", c.state
+    assert_equal Container::Queued, c.state
 
     act_as_system_user do
-      c.state = "Running"
-      c.save!
+      c.update_attributes! state: Container::Locked
+      c.update_attributes! state: Container::Running
     end
 
     cr.reload
     assert_equal "Committed", cr.state
 
     act_as_system_user do
-      c.state = "Complete"
+      c.update_attributes! state: Container::Complete
       c.save!
     end
 
index 0cac6acd936332eaa3b76eb28fad3c84479b1f5a..9cc098117f68f3434b80ca7ee557f1ecfa89fa51 100644 (file)
 require 'test_helper'
 
 class ContainerTest < ActiveSupport::TestCase
-  def check_illegal_modify c
-    c.reload
-    c.command = ["echo", "bar"]
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.container_image = "img2"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
+  include DbCurrentTime
 
-    c.reload
-    c.cwd = "/tmp2"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
+  DEFAULT_ATTRS = {
+    command: ['echo', 'foo'],
+    container_image: 'img',
+    output_path: '/tmp',
+    priority: 1,
+  }
 
-    c.reload
-    c.environment = {"FOO" => "BAR"}
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
+  def minimal_new attrs={}
+    cr = ContainerRequest.new DEFAULT_ATTRS.merge(attrs)
+    act_as_user users(:active) do
+      cr.save!
     end
-
-    c.reload
-    c.mounts = {"FOO" => "BAR"}
-    assert_raises(ActiveRecord::RecordInvalid) do
+    c = Container.new DEFAULT_ATTRS.merge(attrs)
+    act_as_system_user do
       c.save!
+      assert cr.update_attributes(container_uuid: c.uuid,
+                                  state: ContainerRequest::Committed,
+                                  ), show_errors(cr)
     end
+    return c, cr
+  end
 
-    c.reload
-    c.output_path = "/tmp3"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
+  def check_illegal_updates c, bad_updates
+    bad_updates.each do |u|
+      refute c.update_attributes(u), u.inspect
+      refute c.valid?, u.inspect
+      c.reload
     end
+  end
 
-    c.reload
-    c.runtime_constraints = {"FOO" => "BAR"}
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
+  def check_illegal_modify c
+    check_illegal_updates c, [{command: ["echo", "bar"]},
+                              {container_image: "img2"},
+                              {cwd: "/tmp2"},
+                              {environment: {"FOO" => "BAR"}},
+                              {mounts: {"FOO" => "BAR"}},
+                              {output_path: "/tmp3"},
+                              {locked_by_uuid: "zzzzz-gj3su-027z32aux8dg2s1"},
+                              {auth_uuid: "zzzzz-gj3su-017z32aux8dg2s1"},
+                              {runtime_constraints: {"FOO" => "BAR"}}]
   end
 
   def check_bogus_states c
-    c.reload
-    c.state = nil
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.state = "Flubber"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
+    check_illegal_updates c, [{state: nil},
+                              {state: "Flubber"}]
   end
 
-  def check_no_change_from_complete c
+  def check_no_change_from_cancelled c
     check_illegal_modify c
     check_bogus_states c
-
-    c.reload
-    c.priority = 3
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.state = "Queued"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.state = "Running"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.state = "Complete"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
+    check_illegal_updates c, [{ priority: 3 },
+                              { state: Container::Queued },
+                              { state: Container::Locked },
+                              { state: Container::Running },
+                              { state: Container::Complete }]
   end
 
   test "Container create" do
     act_as_system_user do
-      c = Container.new
-      c.command = ["echo", "foo"]
-      c.container_image = "img"
-      c.cwd = "/tmp"
-      c.environment = {}
-      c.mounts = {"BAR" => "FOO"}
-      c.output_path = "/tmp"
-      c.priority = 1
-      c.runtime_constraints = {}
-      c.save!
+      c, _ = minimal_new(environment: {},
+                      mounts: {"BAR" => "FOO"},
+                      output_path: "/tmp",
+                      priority: 1,
+                      runtime_constraints: {})
 
       check_illegal_modify c
       check_bogus_states c
@@ -111,80 +78,100 @@ class ContainerTest < ActiveSupport::TestCase
   end
 
   test "Container running" do
-    act_as_system_user do
-      c = Container.new
-      c.command = ["echo", "foo"]
-      c.container_image = "img"
-      c.output_path = "/tmp"
-      c.save!
+    c, _ = minimal_new priority: 1
 
-      c.reload
-      c.state = "Complete"
-      assert_raises(ActiveRecord::RecordInvalid) do
-        c.save!
-      end
+    set_user_from_auth :dispatch1
+    check_illegal_updates c, [{state: Container::Running},
+                              {state: Container::Complete}]
 
-      c.reload
-      c.state = "Running"
-      c.save!
+    c.update_attributes! state: Container::Locked
+    c.update_attributes! state: Container::Running
 
-      check_illegal_modify c
-      check_bogus_states c
+    check_illegal_modify c
+    check_bogus_states c
 
-      c.reload
-      c.state = "Queued"
-      assert_raises(ActiveRecord::RecordInvalid) do
-        c.save!
-      end
+    check_illegal_updates c, [{state: Container::Queued}]
+    c.reload
 
-      c.reload
-      c.priority = 3
-      c.save!
-    end
+    c.update_attributes! priority: 3
   end
 
-  test "Container queued cancel" do
-    act_as_system_user do
-      c = Container.new
-      c.command = ["echo", "foo"]
-      c.container_image = "img"
-      c.output_path = "/tmp"
-      c.save!
+  test "Lock and unlock" do
+    c, cr = minimal_new priority: 0
 
-      c.reload
-      c.state = "Cancelled"
-      c.save!
+    set_user_from_auth :dispatch1
+    assert_equal Container::Queued, c.state
 
-      check_no_change_from_complete c
-    end
-  end
+    refute c.update_attributes(state: Container::Locked), "no priority"
+    c.reload
+    assert cr.update_attributes priority: 1
 
-  test "Container running cancel" do
-    act_as_system_user do
-      c = Container.new
-      c.command = ["echo", "foo"]
-      c.container_image = "img"
-      c.output_path = "/tmp"
-      c.save!
+    refute c.update_attributes(state: Container::Running), "not locked"
+    c.reload
+    refute c.update_attributes(state: Container::Complete), "not locked"
+    c.reload
 
-      c.reload
-      c.state = "Running"
-      c.save!
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+    assert c.locked_by_uuid
+    assert c.auth_uuid
 
-      c.reload
-      c.state = "Cancelled"
-      c.save!
+    assert c.update_attributes(state: Container::Queued), show_errors(c)
+    refute c.locked_by_uuid
+    refute c.auth_uuid
 
-      check_no_change_from_complete c
-    end
+    refute c.update_attributes(state: Container::Running), "not locked"
+    c.reload
+    refute c.locked_by_uuid
+    refute c.auth_uuid
+
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+    assert c.update_attributes(state: Container::Running), show_errors(c)
+    assert c.locked_by_uuid
+    assert c.auth_uuid
+
+    auth_uuid_was = c.auth_uuid
+
+    refute c.update_attributes(state: Container::Locked), "already running"
+    c.reload
+    refute c.update_attributes(state: Container::Queued), "already running"
+    c.reload
+
+    assert c.update_attributes(state: Container::Complete), show_errors(c)
+    refute c.locked_by_uuid
+    refute c.auth_uuid
+
+    auth_exp = ApiClientAuthorization.find_by_uuid(auth_uuid_was).expires_at
+    assert_operator auth_exp, :<, db_current_time
+  end
+
+  test "Container queued cancel" do
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+    check_no_change_from_cancelled c
+  end
+
+  test "Container locked cancel" do
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+    assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+    check_no_change_from_cancelled c
+  end
+
+  test "Container running cancel" do
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    c.update_attributes! state: Container::Queued
+    c.update_attributes! state: Container::Locked
+    c.update_attributes! state: Container::Running
+    c.update_attributes! state: Container::Cancelled
+    check_no_change_from_cancelled c
   end
 
   test "Container create forbidden for non-admin" do
     set_user_from_auth :active_trustedclient
-    c = Container.new
-    c.command = ["echo", "foo"]
-    c.container_image = "img"
-    c.cwd = "/tmp"
+    c = Container.new DEFAULT_ATTRS
     c.environment = {}
     c.mounts = {"BAR" => "FOO"}
     c.output_path = "/tmp"
@@ -196,34 +183,14 @@ class ContainerTest < ActiveSupport::TestCase
   end
 
   test "Container only set exit code on complete" do
-    act_as_system_user do
-      c = Container.new
-      c.command = ["echo", "foo"]
-      c.container_image = "img"
-      c.output_path = "/tmp"
-      c.save!
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    c.update_attributes! state: Container::Locked
+    c.update_attributes! state: Container::Running
 
-      c.reload
-      c.state = "Running"
-      c.save!
+    check_illegal_updates c, [{exit_code: 1},
+                              {exit_code: 1, state: Container::Cancelled}]
 
-      c.reload
-      c.exit_code = 1
-      assert_raises(ActiveRecord::RecordInvalid) do
-        c.save!
-      end
-
-      c.reload
-      c.exit_code = 1
-      c.state = "Cancelled"
-      assert_raises(ActiveRecord::RecordInvalid) do
-        c.save!
-      end
-
-      c.reload
-      c.exit_code = 1
-      c.state = "Complete"
-      c.save!
-    end
+    assert c.update_attributes(exit_code: 1, state: Container::Complete)
   end
 end
index e05c0c5da4439e44931837ea5a259885624b80d8..40238706063c6659f0e39d89ac1af7eca5e85795 100644 (file)
@@ -72,7 +72,7 @@ func doMain() error {
        }(sigChan)
 
        // Run all queued containers
-       runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+       runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
 
        // Finished dispatching; interrupt any crunch jobs that are still running
        for _, cmd := range runningCmds {
@@ -91,8 +91,8 @@ func doMain() error {
 // Any errors encountered are logged but the program would continue to run (not exit).
 // This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
-       ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
+       ticker := time.NewTicker(pollInterval)
 
        for {
                select {
@@ -107,9 +107,10 @@ func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunComman
 
 // Container data
 type Container struct {
-       UUID     string `json:"uuid"`
-       State    string `json:"state"`
-       Priority int    `json:"priority"`
+       UUID         string `json:"uuid"`
+       State        string `json:"state"`
+       Priority     int    `json:"priority"`
+       LockedByUUID string `json:"locked_by_uuid"`
 }
 
 // ContainerList is a list of the containers from api
@@ -118,7 +119,7 @@ type ContainerList struct {
 }
 
 // Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
        params := arvadosclient.Dict{
                "filters": [][]string{[]string{"state", "=", "Queued"}},
        }
@@ -130,91 +131,120 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
                return
        }
 
-       for i := 0; i < len(containers.Items); i++ {
-               log.Printf("About to run queued container %v", containers.Items[i].UUID)
+       for _, c := range containers.Items {
+               log.Printf("About to run queued container %v", c.UUID)
                // Run the container
-               go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+               waitGroup.Add(1)
+               go func(c Container) {
+                       run(c.UUID, crunchRunCommand, pollInterval)
+                       waitGroup.Done()
+               }(c)
        }
 }
 
+func updateState(uuid, newState string) error {
+       err := arv.Update("containers", uuid,
+               arvadosclient.Dict{
+                       "container": arvadosclient.Dict{"state": newState}},
+               nil)
+       if err != nil {
+               log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+       }
+       return err
+}
+
 // Run queued container:
-// Set container state to locked (TBD)
+// Set container state to Locked
 // Run container using the given crunch-run command
 // Set the container state to Running
 // If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
-       cmd := exec.Command(crunchRunCommand, uuid)
+func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
+       if err := updateState(uuid, "Locked"); err != nil {
+               return
+       }
 
+       cmd := exec.Command(crunchRunCommand, uuid)
        cmd.Stdin = nil
        cmd.Stderr = os.Stderr
        cmd.Stdout = os.Stderr
+
+       // Add this crunch job to the list of runningCmds only if we
+       // succeed in starting crunch-run.
+       runningCmdsMutex.Lock()
        if err := cmd.Start(); err != nil {
-               log.Printf("Error running container for %v: %q", uuid, err)
+               log.Printf("Error starting crunch-run for %v: %q", uuid, err)
+               runningCmdsMutex.Unlock()
+               updateState(uuid, "Queued")
                return
        }
-
-       // Add this crunch job to the list of runningCmds
-       runningCmdsMutex.Lock()
        runningCmds[uuid] = cmd
        runningCmdsMutex.Unlock()
 
-       log.Printf("Started container run for %v", uuid)
+       defer func() {
+               setFinalState(uuid)
 
-       // Add this crunch job to waitGroup
-       waitGroup.Add(1)
-       defer waitGroup.Done()
+               // Remove the crunch job from runningCmds
+               runningCmdsMutex.Lock()
+               delete(runningCmds, uuid)
+               runningCmdsMutex.Unlock()
+       }()
 
-       // Update container status to Running
-       err := arv.Update("containers", uuid,
-               arvadosclient.Dict{
-                       "container": arvadosclient.Dict{"state": "Running"}},
-               nil)
-       if err != nil {
-               log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
-       }
+       log.Printf("Starting container %v", uuid)
+
+       updateState(uuid, "Running")
+
+       cmdExited := make(chan struct{})
 
-       // A goroutine to terminate the runner if container priority becomes zero
-       priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+       // Kill the child process if container priority changes to zero
        go func() {
-               for _ = range priorityTicker.C {
+               ticker := time.NewTicker(pollInterval)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-cmdExited:
+                               return
+                       case <-ticker.C:
+                       }
                        var container Container
                        err := arv.Get("containers", uuid, nil, &container)
                        if err != nil {
-                               log.Printf("Error getting container info for %v: %q", uuid, err)
-                       } else {
-                               if container.Priority == 0 {
-                                       priorityTicker.Stop()
-                                       cmd.Process.Signal(os.Interrupt)
-                               }
+                               log.Printf("Error getting container %v: %q", uuid, err)
+                               continue
+                       }
+                       if container.Priority == 0 {
+                               log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+                               cmd.Process.Signal(os.Interrupt)
                        }
                }
        }()
 
-       // Wait for the crunch job to exit
+       // Wait for crunch-run to exit
        if _, err := cmd.Process.Wait(); err != nil {
                log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
        }
+       close(cmdExited)
 
-       // Remove the crunch job to runningCmds
-       runningCmdsMutex.Lock()
-       delete(runningCmds, uuid)
-       runningCmdsMutex.Unlock()
-
-       priorityTicker.Stop()
+       log.Printf("Finished container run for %v", uuid)
+}
 
-       // The container state should be 'Complete'
+func setFinalState(uuid string) {
+       // The container state should now be 'Complete' if everything
+       // went well. If it started but crunch-run didn't change its
+       // final state to 'Running', fix that now. If it never even
+       // started, cancel it as unrunnable. (TODO: Requeue instead,
+       // and fix tests so they can tell something happened even if
+       // the final state is Queued.)
        var container Container
-       err = arv.Get("containers", uuid, nil, &container)
-       if container.State == "Running" {
-               log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
-               err = arv.Update("containers", uuid,
-                       arvadosclient.Dict{
-                               "container": arvadosclient.Dict{"state": "Complete"}},
-                       nil)
-               if err != nil {
-                       log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
-               }
+       err := arv.Get("containers", uuid, nil, &container)
+       if err != nil {
+               log.Printf("Error getting final container state: %v", err)
+       }
+       fixState := map[string]string{
+               "Running": "Complete",
+               "Locked": "Cancelled",
+       }
+       if newState, ok := fixState[container.State]; ok {
+               log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
+               updateState(uuid, newState)
        }
-
-       log.Printf("Finished container run for %v", uuid)
 }
index 3ec1e2ec6b41c4759bd6f75e6cddc847254633ab..e3ab3a4e1d55ddc5df609fc4273cdfc57e63430a 100644 (file)
@@ -4,12 +4,11 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 
-       "io/ioutil"
+       "bytes"
        "log"
        "net/http"
        "net/http/httptest"
        "os"
-       "strings"
        "syscall"
        "testing"
        "time"
@@ -101,7 +100,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
        apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
                arvadostest.StubResponse{500, string(`{}`)}
 
-       testWithServerStub(c, apiStubResponses, "echo", "Error updating container state")
+       testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state")
 }
 
 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
@@ -122,7 +121,7 @@ func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
        apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
                arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
 
-       testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error running container for zzzzz-dz642-xxxxxxxxxxxxxx3")
+       testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting crunch-run for zzzzz-dz642-xxxxxxxxxxxxxx3")
 }
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
@@ -139,21 +138,19 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                Retries:   0,
        }
 
-       tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
-       c.Check(err, IsNil)
-       defer os.Remove(tempfile.Name())
-       log.SetOutput(tempfile)
+       buf := bytes.NewBuffer(nil)
+       log.SetOutput(buf)
+       defer log.SetOutput(os.Stderr)
 
        go func() {
                time.Sleep(2 * time.Second)
                sigChan <- syscall.SIGTERM
        }()
 
-       runQueuedContainers(1, 1, crunchCmd)
+       runQueuedContainers(time.Second, time.Second, crunchCmd)
 
        // Wait for all running crunch jobs to complete / terminate
        waitGroup.Wait()
 
-       buf, _ := ioutil.ReadFile(tempfile.Name())
-       c.Check(strings.Contains(string(buf), expected), Equals, true)
+       c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }
index f45c2a106a07a61f8c9af871936b51f83c178e12..53e470525385b4117b496e7cebae0bb41b69a9b3 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bufio"
        "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -86,6 +87,15 @@ func doMain() error {
        return nil
 }
 
+type apiClientAuthorization struct {
+       UUID     string `json:"uuid"`
+       APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+       Items []apiClientAuthorization `json:"items"`
+}
+
 // Poll for queued containers using pollInterval.
 // Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
 //
@@ -93,12 +103,18 @@ func doMain() error {
 // This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
-       ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+       var auth apiClientAuthorization
+       err := arv.Call("GET", "api_client_authorizations", "", "current", nil, &auth)
+       if err != nil {
+               log.Printf("Error getting my token UUID: %v", err)
+               return
+       }
 
+       ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
        for {
                select {
                case <-ticker.C:
-                       dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
+                       dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
                case <-doneProcessing:
                        ticker.Stop()
                        return
@@ -112,6 +128,7 @@ type Container struct {
        State              string           `json:"state"`
        Priority           int              `json:"priority"`
        RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
+       LockedByUUID       string           `json:"locked_by_uuid"`
 }
 
 // ContainerList is a list of the containers from api
@@ -119,10 +136,11 @@ type ContainerList struct {
        Items []Container `json:"items"`
 }
 
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
+// Get the list of queued containers from API server and invoke run
+// for each container.
+func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
        params := arvadosclient.Dict{
-               "filters": [][]string{[]string{"state", "=", "Queued"}},
+               "filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
        }
 
        var containers ContainerList
@@ -132,16 +150,33 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand str
                return
        }
 
-       for i := 0; i < len(containers.Items); i++ {
-               log.Printf("About to submit queued container %v", containers.Items[i].UUID)
-               // Run the container
-               go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
+       for _, container := range containers.Items {
+               if container.State == "Locked" {
+                       if container.LockedByUUID != auth.UUID {
+                               // Locked by a different dispatcher
+                               continue
+                       } else if checkMine(container.UUID) {
+                               // I already have a goroutine running
+                               // for this container: it just hasn't
+                               // gotten past Locked state yet.
+                               continue
+                       }
+                       log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
+                               "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
+                               container.UUID, auth.UUID)
+                       setMine(container.UUID, true)
+                       go func() {
+                               waitContainer(container, pollInterval)
+                               setMine(container.UUID, false)
+                       }()
+               }
+               go run(container, crunchRunCommand, finishCommand, pollInterval)
        }
 }
 
 // sbatchCmd
 func sbatchFunc(container Container) *exec.Cmd {
-       memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"]*1048576)))
+       memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
        return exec.Command("sbatch", "--share", "--parsable",
                "--job-name="+container.UUID,
                "--mem-per-cpu="+strconv.Itoa(int(memPerCPU)),
@@ -162,17 +197,19 @@ var striggerCmd = striggerFunc
 func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
        submitErr = nil
 
-       // Mark record as complete if anything errors out.
        defer func() {
-               if submitErr != nil {
-                       // This really should be an "Error" state, see #8018
-                       updateErr := arv.Update("containers", container.UUID,
-                               arvadosclient.Dict{
-                                       "container": arvadosclient.Dict{"state": "Complete"}},
-                               nil)
-                       if updateErr != nil {
-                               log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
-                       }
+               // If we didn't get as far as submitting a slurm job,
+               // unlock the container and return it to the queue.
+               if submitErr == nil {
+                       // OK, no cleanup needed
+                       return
+               }
+               err := arv.Update("containers", container.UUID,
+                       arvadosclient.Dict{
+                               "container": arvadosclient.Dict{"state": "Queued"}},
+                       nil)
+               if err != nil {
+                       log.Printf("Error unlocking container %s: %v", container.UUID, err)
                }
        }()
 
@@ -205,6 +242,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
        stdoutChan := make(chan []byte)
        go func() {
                b, _ := ioutil.ReadAll(stdoutReader)
+               stdoutReader.Close()
                stdoutChan <- b
                close(stdoutChan)
        }()
@@ -212,6 +250,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
        stderrChan := make(chan []byte)
        go func() {
                b, _ := ioutil.ReadAll(stderrReader)
+               stderrReader.Close()
                stderrChan <- b
                close(stderrChan)
        }()
@@ -246,18 +285,35 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
        err := cmd.Run()
        if err != nil {
                log.Printf("While setting up strigger: %v", err)
+               // BUG: we drop the error here and forget about it. A
+               // human has to notice the container is stuck in
+               // Running state, and fix it manually.
        }
 }
 
-// Run a queued container.
-// Set container state to locked (TBD)
-// Submit job to slurm to execute crunch-run command for the container
-// If the container priority becomes zero while crunch job is still running, cancel the job.
-func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
+// Run a queued container: [1] Set container state to locked. [2]
+// Execute crunch-run as a slurm batch job. [3] waitContainer().
+func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
+       setMine(container.UUID, true)
+       defer setMine(container.UUID, false)
+
+       // Update container status to Locked. This will fail if
+       // another dispatcher (token) has already locked it. It will
+       // succeed if *this* dispatcher has already locked it.
+       err := arv.Update("containers", container.UUID,
+               arvadosclient.Dict{
+                       "container": arvadosclient.Dict{"state": "Locked"}},
+               nil)
+       if err != nil {
+               log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
+               return
+       }
+
+       log.Printf("About to submit queued container %v", container.UUID)
 
        jobid, err := submit(container, crunchRunCommand)
        if err != nil {
-               log.Printf("Error queuing container run: %v", err)
+               log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
                return
        }
 
@@ -267,9 +323,9 @@ func run(container Container, crunchRunCommand, finishCommand string, priorityPo
        }
        finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
 
-       // Update container status to Running, this is a temporary workaround
-       // to avoid resubmitting queued containers because record locking isn't
-       // implemented yet.
+       // Update container status to Running. This will fail if
+       // another dispatcher (token) has already locked it. It will
+       // succeed if *this* dispatcher has already locked it.
        err = arv.Update("containers", container.UUID,
                arvadosclient.Dict{
                        "container": arvadosclient.Dict{"state": "Running"}},
@@ -277,31 +333,100 @@ func run(container Container, crunchRunCommand, finishCommand string, priorityPo
        if err != nil {
                log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
        }
+       log.Printf("Submitted container %v to slurm", container.UUID)
+       waitContainer(container, pollInterval)
+}
 
-       log.Printf("Submitted container run for %v", container.UUID)
-
-       containerUUID := container.UUID
+// Wait for a container to finish. Cancel the slurm job if the
+// container priority changes to zero before it ends.
+func waitContainer(container Container, pollInterval time.Duration) {
+       log.Printf("Monitoring container %v started", container.UUID)
+       defer log.Printf("Monitoring container %v finished", container.UUID)
+
+       pollTicker := time.NewTicker(pollInterval)
+       defer pollTicker.Stop()
+       for _ = range pollTicker.C {
+               var updated Container
+               err := arv.Get("containers", container.UUID, nil, &updated)
+               if err != nil {
+                       log.Printf("Error getting container %s: %q", container.UUID, err)
+                       continue
+               }
+               if updated.State == "Complete" || updated.State == "Cancelled" {
+                       return
+               }
+               if updated.Priority != 0 {
+                       continue
+               }
 
-       // A goroutine to terminate the runner if container priority becomes zero
-       priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
-       go func() {
-               for _ = range priorityTicker.C {
-                       var container Container
-                       err := arv.Get("containers", containerUUID, nil, &container)
-                       if err != nil {
-                               log.Printf("Error getting container info for %v: %q", container.UUID, err)
-                       } else {
-                               if container.Priority == 0 {
-                                       log.Printf("Canceling container %v", container.UUID)
-                                       priorityTicker.Stop()
-                                       cancelcmd := exec.Command("scancel", "--name="+container.UUID)
-                                       cancelcmd.Run()
-                               }
-                               if container.State == "Complete" {
-                                       priorityTicker.Stop()
-                               }
+               // Priority is zero, but state is Running or Locked
+               log.Printf("Canceling container %s", container.UUID)
+
+               err = exec.Command("scancel", "--name="+container.UUID).Run()
+               if err != nil {
+                       log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+                       if inQ, err := checkSqueue(container.UUID); err != nil {
+                               log.Printf("Error running squeue: %v", err)
+                               continue
+                       } else if inQ {
+                               log.Printf("Container %s is still in squeue; will retry", container.UUID)
+                               continue
                        }
                }
-       }()
 
+               err = arv.Update("containers", container.UUID,
+                       arvadosclient.Dict{
+                               "container": arvadosclient.Dict{"state": "Cancelled"}},
+                       nil)
+               if err != nil {
+                       log.Printf("Error updating state for container %s: %s", container.UUID, err)
+                       continue
+               }
+
+               return
+       }
+}
+
+func checkSqueue(uuid string) (bool, error) {
+       cmd := exec.Command("squeue", "--format=%j")
+       sq, err := cmd.StdoutPipe()
+       if err != nil {
+               return false, err
+       }
+       cmd.Start()
+       defer cmd.Wait()
+       scanner := bufio.NewScanner(sq)
+       found := false
+       for scanner.Scan() {
+               if scanner.Text() == uuid {
+                       found = true
+               }
+       }
+       if err := scanner.Err(); err != nil {
+               return false, err
+       }
+       return found, nil
+}
+
+var mineMutex sync.RWMutex
+var mineMap = make(map[string]bool)
+
+// Goroutine-safely add/remove uuid to the set of "my" containers,
+// i.e., ones for which this process has a goroutine running.
+func setMine(uuid string, t bool) {
+       mineMutex.Lock()
+       if t {
+               mineMap[uuid] = true
+       } else {
+               delete(mineMap, uuid)
+       }
+       mineMutex.Unlock()
+}
+
+// Check whether there is already a goroutine running for this
+// container.
+func checkMine(uuid string) bool {
+       mineMutex.RLocker().Lock()
+       defer mineMutex.RLocker().Unlock()
+       return mineMap[uuid]
 }
index e58b9e4f546af23fbab7be74c82ce7fda2424e83..3dfb7d5a3e89ae5b9cb5678e545797e4fba47160 100644 (file)
@@ -4,8 +4,8 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 
+       "bytes"
        "fmt"
-       "io/ioutil"
        "log"
        "math"
        "net/http"
@@ -52,6 +52,7 @@ func (s *TestSuite) SetUpTest(c *C) {
        if err != nil {
                c.Fatalf("Error making arvados client: %s", err)
        }
+       os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
 }
 
 func (s *TestSuite) TearDownTest(c *C) {
@@ -76,7 +77,7 @@ func (s *TestSuite) Test_doMain(c *C) {
        }(sbatchCmd)
        sbatchCmd = func(container Container) *exec.Cmd {
                sbatchCmdLine = sbatchFunc(container).Args
-               return exec.Command("echo", container.UUID)
+               return exec.Command("sh")
        }
 
        // Override striggerCmd
@@ -89,10 +90,12 @@ func (s *TestSuite) Test_doMain(c *C) {
                        apiHost, apiToken, apiInsecure).Args
                go func() {
                        time.Sleep(5 * time.Second)
-                       arv.Update("containers", containerUUID,
-                               arvadosclient.Dict{
-                                       "container": arvadosclient.Dict{"state": "Complete"}},
-                               nil)
+                       for _, state := range []string{"Running", "Complete"} {
+                               arv.Update("containers", containerUUID,
+                                       arvadosclient.Dict{
+                                               "container": arvadosclient.Dict{"state": state}},
+                                       nil)
+                       }
                }()
                return exec.Command("echo", "strigger")
        }
@@ -122,7 +125,7 @@ func (s *TestSuite) Test_doMain(c *C) {
        c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
        c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
-               "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h 1 zzzzz-dz642-queuedcontainer"})
+               "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
 
        // There should be no queued containers now
        err = arv.List("containers", params, &containers)
@@ -138,6 +141,7 @@ func (s *TestSuite) Test_doMain(c *C) {
 
 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
        apiStubResponses := make(map[string]arvadostest.StubResponse)
+       apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
        apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
        testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
@@ -157,18 +161,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                Retries:   0,
        }
 
-       tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
-       c.Check(err, IsNil)
-       defer os.Remove(tempfile.Name())
-       log.SetOutput(tempfile)
+       buf := bytes.NewBuffer(nil)
+       log.SetOutput(buf)
+       defer log.SetOutput(os.Stderr)
 
        go func() {
-               time.Sleep(2 * time.Second)
+               for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+                       time.Sleep(100 * time.Millisecond)
+               }
                sigChan <- syscall.SIGTERM
        }()
 
        runQueuedContainers(2, 1, crunchCmd, crunchCmd)
 
-       buf, _ := ioutil.ReadFile(tempfile.Name())
-       c.Check(strings.Contains(string(buf), expected), Equals, true)
+       c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }
index 9c0374351b075d03e2bd0469c4b813c373308e87..5bd7f102556a8f2b532dc0c0e74790cb00d60489 100644 (file)
@@ -264,7 +264,7 @@ func (v *AzureBlobVolume) Touch(loc string) error {
        }
        return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
                "touch": fmt.Sprintf("%d", time.Now()),
-       })
+       }, nil)
 }
 
 // Mtime returns the last-modified property of a block blob.