Merge branch 'master' into 12033-multisite-search
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 17 Aug 2017 17:18:41 +0000 (13:18 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 17 Aug 2017 17:18:41 +0000 (13:18 -0400)
refs #12033

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

47 files changed:
.gitignore
apps/workbench/app/controllers/collections_controller.rb
apps/workbench/app/views/jobs/_show_log.html.erb
apps/workbench/config/application.default.yml
apps/workbench/config/application.yml.example
apps/workbench/config/load_config.rb
apps/workbench/test/controllers/collections_controller_test.rb
apps/workbench/test/integration/anonymous_access_test.rb
apps/workbench/test/integration/collections_test.rb
apps/workbench/test/integration/jobs_test.rb
apps/workbench/test/integration_helper.rb
build/build.list
build/run-build-docker-images.sh
build/run-build-packages.sh
build/run-tests.sh
sdk/cli/bin/crunch-job
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/setup.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/wf/scatter2_subwf.cwl
sdk/go/arvados/container.go
sdk/go/arvados/error.go
sdk/go/health/handler.go
sdk/perl/lib/Arvados/Request.pm
sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py
services/api/app/controllers/application_controller.rb
services/keep-web/handler.go
services/keepstore/s3_volume.go
services/nodemanager/arvnodeman/baseactor.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
services/nodemanager/arvnodeman/timedcallback.py
services/nodemanager/setup.py
services/nodemanager/tests/stress_test.cwl [new file with mode: 0644]
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_dispatch_slurm.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_failure.py
services/nodemanager/tests/test_jobqueue.py
services/nodemanager/tests/test_timedcallback.py
services/nodemanager/tests/testutil.py
services/ws/event_source.go
services/ws/handler.go
services/ws/session_v0.go
tools/arvbox/lib/arvbox/docker/Dockerfile.base

index 77b98999bca91ede8ff5aab46f04473ba1f2cd99..0e876bb6f4d430eea2a79bbe34b0ee3f37c8a6fc 100644 (file)
@@ -27,3 +27,4 @@ sdk/cwl/arvados_cwl/_version.py
 services/api/config/arvados-clients.yml
 *#*
 .DS_Store
+.vscode
index da7b4666a03c753f8f4876753ac585e446fb33ad..779d95c45b874d4fec157768e19013a1c68a0022 100644 (file)
@@ -115,20 +115,10 @@ class CollectionsController < ApplicationController
   end
 
   def show_file_links
-    if Rails.configuration.keep_web_url || Rails.configuration.keep_web_download_url
-      # show_file will redirect to keep-web's directory listing
-      return show_file
-    end
-    Thread.current[:reader_tokens] = [params[:reader_token]]
-    return if false.equal?(find_object_by_uuid)
-    render layout: false
+    return show_file
   end
 
   def show_file
-    # We pipe from arv-get to send the file to the user.  Before we start it,
-    # we ask the API server if the file actually exists.  This serves two
-    # purposes: it lets us return a useful status code for common errors, and
-    # helps us figure out which token to provide to arv-get.
     # The order of searched tokens is important: because the anonymous user
     # token is passed along with every API request, we have to check it first.
     # Otherwise, it's impossible to know whether any other request succeeded
@@ -145,62 +135,18 @@ class CollectionsController < ApplicationController
       return
     end
 
-    # If we are configured to use a keep-web server, just redirect to
-    # the appropriate URL.
-    if Rails.configuration.keep_web_url or
-        Rails.configuration.keep_web_download_url
-      opts = {}
-      if usable_token == params[:reader_token]
-        opts[:path_token] = usable_token
-      elsif usable_token == Rails.configuration.anonymous_user_token
-        # Don't pass a token at all
-      else
-        # We pass the current user's real token only if it's necessary
-        # to read the collection.
-        opts[:query_token] = usable_token
-      end
-      opts[:disposition] = params[:disposition] if params[:disposition]
-      return redirect_to keep_web_url(params[:uuid], params[:file], opts)
-    end
-
-    # No keep-web server available. Get the file data with arv-get,
-    # and serve it through Rails.
-
-    file_name = params[:file].andand.sub(/^(\.\/|\/|)/, './')
-    if file_name.nil? or not coll.manifest.has_file?(file_name)
-      return render_not_found
-    end
-
-    opts = params.merge(arvados_api_token: usable_token)
-
-    # Handle Range requests. Currently we support only 'bytes=0-....'
-    if request.headers.include? 'HTTP_RANGE'
-      if m = /^bytes=0-(\d+)/.match(request.headers['HTTP_RANGE'])
-        opts[:maxbytes] = m[1]
-        size = params[:size] || '*'
-        self.response.status = 206
-        self.response.headers['Content-Range'] = "bytes 0-#{m[1]}/#{size}"
-      end
-    end
-
-    ext = File.extname(params[:file])
-    self.response.headers['Content-Type'] =
-      Rack::Mime::MIME_TYPES[ext] || 'application/octet-stream'
-    if params[:size]
-      size = params[:size].to_i
-      if opts[:maxbytes]
-        size = [size, opts[:maxbytes].to_i].min
-      end
-      self.response.headers['Content-Length'] = size.to_s
-    end
-    self.response.headers['Content-Disposition'] = params[:disposition] if params[:disposition]
-    begin
-      file_enumerator(opts).each do |bytes|
-        response.stream.write bytes
-      end
-    ensure
-      response.stream.close
+    opts = {}
+    if usable_token == params[:reader_token]
+      opts[:path_token] = usable_token
+    elsif usable_token == Rails.configuration.anonymous_user_token
+      # Don't pass a token at all
+    else
+      # We pass the current user's real token only if it's necessary
+      # to read the collection.
+      opts[:query_token] = usable_token
     end
+    opts[:disposition] = params[:disposition] if params[:disposition]
+    return redirect_to keep_web_url(params[:uuid], params[:file], opts)
   end
 
   def sharing_scopes
@@ -288,11 +234,7 @@ class CollectionsController < ApplicationController
 
   def download_link
     token = @search_sharing.first.api_token
-    if Rails.configuration.keep_web_url || Rails.configuration.keep_web_download_url
-      keep_web_url(@object.uuid, nil, {path_token: token})
-    else
-      collections_url + "/download/#{@object.uuid}/#{token}/"
-    end
+    keep_web_url(@object.uuid, nil, {path_token: token})
   end
 
   def share
@@ -468,43 +410,4 @@ class CollectionsController < ApplicationController
 
     uri.to_s
   end
-
-  # Note: several controller and integration tests rely on stubbing
-  # file_enumerator to return fake file content.
-  def file_enumerator opts
-    FileStreamer.new opts
-  end
-
-  class FileStreamer
-    include ArvadosApiClientHelper
-    def initialize(opts={})
-      @opts = opts
-    end
-    def each
-      return unless @opts[:uuid] && @opts[:file]
-
-      env = Hash[ENV].dup
-
-      require 'uri'
-      u = URI.parse(arvados_api_client.arvados_v1_base)
-      env['ARVADOS_API_HOST'] = "#{u.host}:#{u.port}"
-      env['ARVADOS_API_TOKEN'] = @opts[:arvados_api_token]
-      env['ARVADOS_API_HOST_INSECURE'] = "true" if Rails.configuration.arvados_insecure_https
-
-      bytesleft = @opts[:maxbytes].andand.to_i || 2**16
-      io = IO.popen([env, 'arv-get', "#{@opts[:uuid]}/#{@opts[:file]}"], 'rb')
-      while bytesleft > 0 && (buf = io.read([bytesleft, 2**16].min)) != nil
-        # shrink the bytesleft count, if we were given a maximum byte
-        # count to read
-        if @opts.include? :maxbytes
-          bytesleft = bytesleft - buf.length
-        end
-        yield buf
-      end
-      io.close
-      # "If ios is opened by IO.popen, close sets $?."
-      # http://www.ruby-doc.org/core-2.1.3/IO.html#method-i-close
-      Rails.logger.warn("#{@opts[:uuid]}/#{@opts[:file]}: #{$?}") if $? != 0
-    end
-  end
 end
index b4ede751183206e8259afd6df196f7a368c2bb7e..821b4bcdf27de248d1b3b29a81e0fb4e18bad04b 100644 (file)
@@ -74,7 +74,7 @@ var makeFilter = function() {
     $("#log-viewer-download-pane").show();
     var headers = {};
     if (log_size > log_maxbytes) {
-      headers['Range'] = 'bytes=0-' + log_maxbytes;
+      headers['Range'] = 'bytes=0-' + (log_maxbytes - 1);
     }
     var ajax_opts = { dataType: 'text', headers: headers };
     load_log();
index 9f6c1ed338042764c3f38d506753caab7682350e..da20573abb859b96a5202983ab9e77e1703ac25f 100644 (file)
@@ -99,6 +99,9 @@ test:
   profiling_enabled: true
   secret_token: <%= rand(2**256).to_s(36) %>
   secret_key_base: <%= rand(2**256).to_s(36) %>
+  # This setting is to allow workbench start when running tests, it should be
+  # set to a correct value when testing relevant features.
+  keep_web_url: http://example.com/c=%{uuid_or_pdh}
 
   # When you run the Workbench's integration tests, it starts the API
   # server as a dependency.  These settings should match the API
@@ -243,8 +246,8 @@ common:
   shell_in_a_box_url: false
 
   # Format of preview links. If false, use keep_web_download_url
-  # instead, and disable inline preview. If both are false, use
-  # Workbench's built-in file download/preview mechanism.
+  # instead, and disable inline preview.
+  # If both are false, Workbench won't start, this is a mandatory configuration.
   #
   # Examples:
   # keep_web_url: https://%{uuid_or_pdh}.collections.uuid_prefix.arvadosapi.com
index 75edb8789d7afb26a82b628293be36e29dad9c8e..85df228f4855f755da2bf13fca31b6f85c689a0d 100644 (file)
@@ -23,6 +23,10 @@ development:
   arvados_v1_base: https://arvados.local:3030/arvados/v1
   arvados_insecure_https: true
 
+  # You need to configure at least one of these:
+  keep_web_url: false
+  keep_web_download_url: false
+
 production:
   # At minimum, you need a nice long randomly generated secret_token here.
   secret_token: ~
@@ -31,3 +35,7 @@ production:
   arvados_login_base: https://arvados.local:3030/login
   arvados_v1_base: https://arvados.local:3030/arvados/v1
   arvados_insecure_https: false
+
+  # You need to configure at least one of these:
+  keep_web_url: false
+  keep_web_download_url: false
index e2185a896963c23f9d2cd2c2036c885e196ae62f..d8d4dff567a45e63e27e744ab8b0f9a0da36c82b 100644 (file)
@@ -46,14 +46,26 @@ EOS
       cfg.send "#{k}=", v
     end
   end
-  if !nils.empty?
+  if !nils.empty? and not ::Rails.groups.include?('assets')
     raise <<EOS
+#{::Rails.groups.include?('assets')}
 Refusing to start in #{::Rails.env.to_s} mode with missing configuration.
 
 The following configuration settings must be specified in
 config/application.yml:
 * #{nils.join "\n* "}
 
+EOS
+  end
+  # Refuse to start if keep-web isn't configured
+  if not (config.keep_web_url or config.keep_web_download_url) and not ::Rails.groups.include?('assets')
+    raise <<EOS
+Refusing to start in #{::Rails.env.to_s} mode with missing configuration.
+
+Keep-web service must be configured in config/application.yml:
+* keep_web_url
+* keep_web_download_url
+
 EOS
   end
 end
index 26d6fda85e53f87bac69c47ea3e2e78639f584d0..773a4f45714b515d664ec290ecb61e32bcca5695 100644 (file)
@@ -23,15 +23,6 @@ class CollectionsControllerTest < ActionController::TestCase
       end
   end
 
-  def stub_file_content
-    # For the duration of the current test case, stub file download
-    # content with a randomized (but recognizable) string. Return the
-    # string, the test case can use it in assertions.
-    txt = 'the quick brown fox ' + rand(2**32).to_s
-    @controller.stubs(:file_enumerator).returns([txt])
-    txt
-  end
-
   def collection_params(collection_name, file_name=nil)
     uuid = api_fixture('collections')[collection_name.to_s]['uuid']
     params = {uuid: uuid, id: uuid}
@@ -75,17 +66,14 @@ class CollectionsControllerTest < ActionController::TestCase
   end
 
   test "download a file with spaces in filename" do
+    setup_for_keep_web
     collection = api_fixture('collections')['w_a_z_file']
-    fakepipe = IO.popen(['echo', '-n', 'w a z'], 'rb')
-    IO.expects(:popen).with { |cmd, mode|
-      cmd.include? "#{collection['uuid']}/w a z"
-    }.returns(fakepipe)
     get :show_file, {
       uuid: collection['uuid'],
       file: 'w a z'
     }, session_for(:active)
-    assert_response :success
-    assert_equal 'w a z', response.body
+    assert_response :redirect
+    assert_match /w%20a%20z/, response.redirect_url
   end
 
   test "viewing a collection fetches related projects" do
@@ -137,20 +125,18 @@ class CollectionsControllerTest < ActionController::TestCase
     params[:reader_token] = api_fixture("api_client_authorizations",
                                         "active_all_collections", "api_token")
     get(:show_file_links, params)
-    assert_response :success
-    assert_equal([['.', 'foo', 3]], assigns(:object).files)
+    assert_response :redirect
     assert_no_session
   end
 
   test "fetching collection file with reader token" do
-    expected = stub_file_content
+    setup_for_keep_web
     params = collection_params(:foo_file, "foo")
     params[:reader_token] = api_fixture("api_client_authorizations",
                                         "active_all_collections", "api_token")
     get(:show_file, params)
-    assert_response :success
-    assert_equal(expected, @response.body,
-                 "failed to fetch a Collection file with a reader token")
+    assert_response :redirect
+    assert_match /foo/, response.redirect_url
     assert_no_session
   end
 
@@ -163,24 +149,23 @@ class CollectionsControllerTest < ActionController::TestCase
   end
 
   test "getting a file from Keep" do
+    setup_for_keep_web
     params = collection_params(:foo_file, 'foo')
     sess = session_for(:active)
-    expect_content = stub_file_content
     get(:show_file, params, sess)
-    assert_response :success
-    assert_equal(expect_content, @response.body,
-                 "failed to get a correct file from Keep")
+    assert_response :redirect
+    assert_match /foo/, response.redirect_url
   end
 
   test 'anonymous download' do
+    setup_for_keep_web
     config_anonymous true
-    expect_content = stub_file_content
     get :show_file, {
       uuid: api_fixture('collections')['user_agreement_in_anonymously_accessible_project']['uuid'],
       file: 'GNU_General_Public_License,_version_3.pdf',
     }
-    assert_response :success
-    assert_equal expect_content, response.body
+    assert_response :redirect
+    assert_match /GNU_General_Public_License/, response.redirect_url
   end
 
   test "can't get a file from Keep without permission" do
@@ -190,22 +175,14 @@ class CollectionsControllerTest < ActionController::TestCase
     assert_response 404
   end
 
-  test "trying to get a nonexistent file from Keep returns a 404" do
-    params = collection_params(:foo_file, 'gone')
-    sess = session_for(:admin)
-    get(:show_file, params, sess)
-    assert_response 404
-  end
-
   test "getting a file from Keep with a good reader token" do
+    setup_for_keep_web
     params = collection_params(:foo_file, 'foo')
     read_token = api_fixture('api_client_authorizations')['active']['api_token']
     params[:reader_token] = read_token
-    expect_content = stub_file_content
     get(:show_file, params)
-    assert_response :success
-    assert_equal(expect_content, @response.body,
-                 "failed to get a correct file from Keep using a reader token")
+    assert_response :redirect
+    assert_match /foo/, response.redirect_url
     assert_not_equal(read_token, session[:arvados_api_token],
                      "using a reader token set the session's API token")
   end
@@ -229,25 +206,22 @@ class CollectionsControllerTest < ActionController::TestCase
   end
 
   test "can get a file with an unpermissioned auth but in-scope reader token" do
+    setup_for_keep_web
     params = collection_params(:foo_file, 'foo')
     sess = session_for(:expired)
     read_token = api_fixture('api_client_authorizations')['active']['api_token']
     params[:reader_token] = read_token
-    expect_content = stub_file_content
     get(:show_file, params, sess)
-    assert_response :success
-    assert_equal(expect_content, @response.body,
-                 "failed to get a correct file from Keep using a reader token")
+    assert_response :redirect
     assert_not_equal(read_token, session[:arvados_api_token],
                      "using a reader token set the session's API token")
   end
 
   test "inactive user can retrieve user agreement" do
+    setup_for_keep_web
     ua_collection = api_fixture('collections')['user_agreement']
     # Here we don't test whether the agreement can be retrieved from
-    # Keep. We only test that show_file decides to send file content,
-    # so we use the file content stub.
-    stub_file_content
+    # Keep. We only test that show_file decides to send file content.
     get :show_file, {
       uuid: ua_collection['uuid'],
       file: ua_collection['manifest_text'].match(/ \d+:\d+:(\S+)/)[1]
@@ -255,7 +229,7 @@ class CollectionsControllerTest < ActionController::TestCase
     assert_nil(assigns(:unsigned_user_agreements),
                "Did not skip check_user_agreements filter " +
                "when showing the user agreement.")
-    assert_response :success
+    assert_response :redirect
   end
 
   test "requesting nonexistent Collection returns 404" do
@@ -263,37 +237,12 @@ class CollectionsControllerTest < ActionController::TestCase
                     :active, 404)
   end
 
-  test "use a reasonable read buffer even if client requests a huge range" do
-    fakefiledata = mock
-    IO.expects(:popen).returns(fakefiledata)
-    fakefiledata.expects(:read).twice.with() do |length|
-      # Fail the test if read() is called with length>1MiB:
-      length < 2**20
-      ## Force the ActionController::Live thread to lose the race to
-      ## verify that @response.body.length actually waits for the
-      ## response (see below):
-      # sleep 3
-    end.returns("foo\n", nil)
-    fakefiledata.expects(:close)
-    foo_file = api_fixture('collections')['foo_file']
-    @request.headers['Range'] = 'bytes=0-4294967296/*'
-    get :show_file, {
-      uuid: foo_file['uuid'],
-      file: foo_file['manifest_text'].match(/ \d+:\d+:(\S+)/)[1]
-    }, session_for(:active)
-    # Wait for the whole response to arrive before deciding whether
-    # mocks' expectations were met. Otherwise, Mocha will fail the
-    # test depending on how slowly the ActionController::Live thread
-    # runs.
-    @response.body.length
-  end
-
   test "show file in a subdirectory of a collection" do
+    setup_for_keep_web
     params = collection_params(:collection_with_files_in_subdir, 'subdir2/subdir3/subdir4/file1_in_subdir4.txt')
-    expect_content = stub_file_content
     get(:show_file, params, session_for(:user1_with_load))
-    assert_response :success
-    assert_equal(expect_content, @response.body, "failed to get a correct file from Keep")
+    assert_response :redirect
+    assert_match /subdir2\/subdir3\/subdir4\/file1_in_subdir4\.txt/, response.redirect_url
   end
 
   test 'provenance graph' do
@@ -521,7 +470,6 @@ class CollectionsControllerTest < ActionController::TestCase
   def setup_for_keep_web cfg='https://%{uuid_or_pdh}.example', dl_cfg=false
     Rails.configuration.keep_web_url = cfg
     Rails.configuration.keep_web_download_url = dl_cfg
-    @controller.expects(:file_enumerator).never
   end
 
   %w(uuid portable_data_hash).each do |id_type|
index 6141cb3ca1e87989bf7713120f8969f2d75c64a7..6971c39f3385f59674a681e7ccb38cf1f0d9f2b4 100644 (file)
@@ -5,6 +5,8 @@
 require 'integration_helper'
 
 class AnonymousAccessTest < ActionDispatch::IntegrationTest
+  include KeepWebConfig
+
   # These tests don't do state-changing API calls. Save some time by
   # skipping the database reset.
   reset_api_fixtures :after_each_test, false
@@ -117,10 +119,12 @@ class AnonymousAccessTest < ActionDispatch::IntegrationTest
   end
 
   test 'view file' do
+    use_keep_web_config
+
     magic = rand(2**512).to_s 36
-    CollectionsController.any_instance.stubs(:file_enumerator).returns([magic])
-    collection = api_fixture('collections')['public_text_file']
-    visit '/collections/' + collection['uuid']
+    owner = api_fixture('groups')['anonymously_accessible_project']['uuid']
+    col = upload_data_and_get_collection(magic, 'admin', "Hello\\040world.txt", owner)
+    visit '/collections/' + col.uuid
     find('tr,li', text: 'Hello world.txt').
       find('a[title~=View]').click
     assert_text magic
index 8619858dfeee90889cc411b2e06677caa2291134..71cfe38abfda32b2d5b5ce943ecdbf26f46ff52b 100644 (file)
@@ -6,6 +6,8 @@ require 'integration_helper'
 require_relative 'integration_test_utils'
 
 class CollectionsTest < ActionDispatch::IntegrationTest
+  include KeepWebConfig
+
   setup do
     need_javascript
   end
@@ -44,7 +46,7 @@ class CollectionsTest < ActionDispatch::IntegrationTest
   test "creating and uncreating a sharing link" do
     coll_uuid = api_fixture("collections", "collection_owned_by_active", "uuid")
     download_link_re =
-      Regexp.new(Regexp.escape("/collections/download/#{coll_uuid}/"))
+      Regexp.new(Regexp.escape("/c=#{coll_uuid}/"))
     visit page_with_token("active_trustedclient", "/collections/#{coll_uuid}")
     within "#sharing-button" do
       check_sharing(:on, download_link_re)
@@ -53,10 +55,20 @@ class CollectionsTest < ActionDispatch::IntegrationTest
   end
 
   test "can download an entire collection with a reader token" do
-    Capybara.current_driver = :rack_test
-    CollectionsController.any_instance.
-      stubs(:file_enumerator).returns(["foo\n", "file\n"])
-    uuid = api_fixture('collections')['foo_file']['uuid']
+    use_keep_web_config
+
+    token = api_fixture('api_client_authorizations')['active']['api_token']
+    data = "foo\nfile\n"
+    datablock = `echo -n #{data.shellescape} | ARVADOS_API_TOKEN=#{token.shellescape} arv-put --no-progress --raw -`.strip
+    assert $?.success?, $?
+
+    col = nil
+    use_token 'active' do
+      mtxt = ". #{datablock} 0:#{data.length}:foo\n"
+      col = Collection.create(manifest_text: mtxt)
+    end
+
+    uuid = col.uuid
     token = api_fixture('api_client_authorizations')['active_all_collections']['api_token']
     url_head = "/collections/download/#{uuid}/#{token}/"
     visit url_head
@@ -78,10 +90,8 @@ class CollectionsTest < ActionDispatch::IntegrationTest
     end
     assert_equal(['foo'], hrefs.compact.sort,
                  "download page did provide strictly file links")
-    within "#collection_files" do
-      click_link "foo"
-      assert_equal("foo\nfile\n", page.html)
-    end
+    click_link "foo"
+    assert_text "foo\nfile\n"
   end
 
   test "combine selected collections into new collection" do
index 8a60a84459243d01d31f4b564206464738f2a750..bfed03b14bd223085dd7fa704eb8494c951ec198 100644 (file)
@@ -39,37 +39,25 @@ class JobsTest < ActionDispatch::IntegrationTest
     assert_selector 'a[href="/"]', text: 'Go to dashboard'
   end
 
-  test "view job log" do
-    job = api_fixture('jobs')['job_with_real_log']
-
-    IO.expects(:popen).returns(fakepipe_with_log_data)
-
-    visit page_with_token("active", "/jobs/#{job['uuid']}")
-    assert page.has_text? job['script_version']
-
-    find(:xpath, "//a[@href='#Log']").click
-    wait_for_ajax
-    assert page.has_text? 'Started at'
-    assert page.has_text? 'Finished at'
-    assert page.has_text? 'log message 1'
-    assert page.has_text? 'log message 2'
-    assert page.has_text? 'log message 3'
-    assert page.has_no_text? 'Showing only 100 bytes of this log'
-  end
-
   test 'view partial job log' do
+    need_selenium 'to be able to see the CORS response headers (PhantomJS 1.9.8 does not)'
+    use_keep_web_config
+
     # This config will be restored during teardown by ../test_helper.rb:
     Rails.configuration.log_viewer_max_bytes = 100
 
-    IO.expects(:popen).returns(fakepipe_with_log_data)
-    job = api_fixture('jobs')['job_with_real_log']
-
-    visit page_with_token("active", "/jobs/#{job['uuid']}")
-    assert page.has_text? job['script_version']
-
-    find(:xpath, "//a[@href='#Log']").click
+    logdata = fakepipe_with_log_data.read
+    job_uuid = api_fixture('jobs')['running']['uuid']
+    logcollection = upload_data_and_get_collection(logdata, 'active', "#{job_uuid}.log.txt")
+    job = nil
+    use_token 'active' do
+      job = Job.find job_uuid
+      job.update_attributes log: logcollection.portable_data_hash
+    end
+    visit page_with_token 'active', '/jobs/'+job.uuid
+    find('a[href="#Log"]').click
     wait_for_ajax
-    assert page.has_text? 'Showing only 100 bytes of this log'
+    assert_text 'Showing only 100 bytes of this log'
   end
 
   test 'view log via keep-web redirect' do
index 5d2cefe447c4fb84e29eae4dc93bea6fd3748d90..ef2779cc3e78eedb556ce2dc7114a6e2466112b0 100644 (file)
@@ -163,7 +163,6 @@ module KeepWebConfig
     @kwdport = getport 'keep-web-dl-ssl'
     Rails.configuration.keep_web_url = "https://localhost:#{@kwport}/c=%{uuid_or_pdh}"
     Rails.configuration.keep_web_download_url = "https://localhost:#{@kwdport}/c=%{uuid_or_pdh}"
-    CollectionsController.any_instance.expects(:file_enumerator).never
   end
 end
 
@@ -241,3 +240,19 @@ class ActionDispatch::IntegrationTest
     end
   end
 end
+
+def upload_data_and_get_collection(data, user, filename, owner_uuid=nil)
+  token = api_fixture('api_client_authorizations')[user]['api_token']
+  datablock = `echo -n #{data.shellescape} | ARVADOS_API_TOKEN=#{token.shellescape} arv-put --no-progress --raw -`.strip
+  assert $?.success?, $?
+  col = nil
+  use_token user do
+    mtxt = ". #{datablock} 0:#{data.length}:#{filename}\n"
+    if owner_uuid
+      col = Collection.create(manifest_text: mtxt, owner_uuid: owner_uuid)
+    else
+      col = Collection.create(manifest_text: mtxt)
+    end
+  end
+  return col
+end
index b71ac890b209908608d2eb59d35fb37bd9f85e43..55d580e0ff92e7d95057464ee1f4758a39886c14 100644 (file)
@@ -32,7 +32,7 @@ debian8,debian9,ubuntu1204,ubuntu1404,ubuntu1604,centos7|docker-py|1.7.2|2|pytho
 debian8,debian9,ubuntu1204,centos7|six|1.10.0|2|python3|all
 debian8,debian9,ubuntu1204,ubuntu1404,centos7|requests|2.12.4|2|python3|all
 debian8,debian9,ubuntu1204,ubuntu1404,ubuntu1604,centos7|websocket-client|0.37.0|2|python3|all
-ubuntu1204|requests|2.12.4|2|python|all
+ubuntu1204,ubuntu1404|requests|2.4.3|2|python|all
 ubuntu1204,centos7|contextlib2|0.5.4|2|python|all
 ubuntu1204,centos7|isodate|0.5.4|2|python|all
 centos7|daemon|2.1.1|2|python|all
index 9f4f8125debb50836bd44604aa98529dfc6932e1..fd7b38e8b64e66e3c18275578890a84c3ccdd2a4 100755 (executable)
@@ -128,6 +128,8 @@ timer_reset
 # clean up the docker build environment
 cd "$WORKSPACE"
 
+title "Starting arvbox build localdemo"
+
 tools/arvbox/bin/arvbox build localdemo
 ECODE=$?
 
@@ -136,6 +138,8 @@ if [[ "$ECODE" != "0" ]]; then
     EXITCODE=$(($EXITCODE + $ECODE))
 fi
 
+title "Starting arvbox build dev"
+
 tools/arvbox/bin/arvbox build dev
 
 ECODE=$?
index af90eb6f2dabad701c4b211a4733aad73e1b58f5..2cde946d494b566d1681f3c16fd851e5a9fb7fa5 100755 (executable)
@@ -620,6 +620,7 @@ if [[ "$?" == "0" ]] ; then
       \cp config/application.yml.example config/application.yml -f
       \cp config/environments/production.rb.example config/environments/production.rb -f
       sed -i 's/secret_token: ~/secret_token: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/' config/application.yml
+      sed -i 's/keep_web_url: false/keep_web_url: exampledotcom/' config/application.yml
 
       RAILS_ENV=production RAILS_GROUPS=assets bundle exec rake npm:install >/dev/null
       RAILS_ENV=production RAILS_GROUPS=assets bundle exec rake assets:precompile >/dev/null
index 15e89fa9273db020c5a7651b28ee22d049fbbcd4..70ea0ef073ba4a8d726ebce8565f1a4e659ab525 100755 (executable)
@@ -81,7 +81,7 @@ services/keepstore
 services/keep-balance
 services/login-sync
 services/nodemanager
-services/nodemanager-integration
+services/nodemanager_integration
 services/crunch-run
 services/crunch-dispatch-local
 services/crunch-dispatch-slurm
@@ -547,6 +547,9 @@ do_test() {
         apps/workbench_units | apps/workbench_functionals | apps/workbench_integration)
             suite=apps/workbench
             ;;
+        services/nodemanager | services/nodemanager_integration)
+            suite=services/nodemanager_suite
+            ;;
         *)
             suite="${1}"
             ;;
@@ -863,11 +866,11 @@ test_login-sync() {
 }
 do_test services/login-sync login-sync
 
-test_nodemanager-integration() {
+test_nodemanager_integration() {
     cd "$WORKSPACE/services/nodemanager" \
-        && tests/integration_test.py ${testargs[services/nodemanager-integration]}
+        && tests/integration_test.py ${testargs[services/nodemanager_integration]}
 }
-do_test services/nodemanager-integration nodemanager-integration
+do_test services/nodemanager_integration nodemanager_integration
 
 for p in "${pythonstuff[@]}"
 do
index 5a92176e7f02fba11525190ccee511339819e1d2..fd598c9dd37bbfdd88aa12e2e84d5d73c6929bf9 100755 (executable)
@@ -1022,7 +1022,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
   delete $Jobstep->{tempfail};
 
   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
-  $Jobstep->{'arvados_task'}->save;
+  retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
 
   splice @jobstep_todo, $todo_ptr, 1;
   --$todo_ptr;
@@ -1205,7 +1205,7 @@ sub reapchildren
             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
             exit_status_s($childstatus)));
       $Jobstep->{'arvados_task'}->{success} = 0;
-      $Jobstep->{'arvados_task'}->save;
+      retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
       $task_success = 0;
     }
 
@@ -1258,7 +1258,7 @@ sub reapchildren
     $Jobstep->{exitcode} = $childstatus;
     $Jobstep->{finishtime} = time;
     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
-    $Jobstep->{'arvados_task'}->save;
+    retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
     process_stderr_final ($jobstepidx);
     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
                               length($Jobstep->{'arvados_task'}->{output}),
@@ -1544,7 +1544,7 @@ sub preprocess_stderr
         $st->{node}->{fail_count}++;
       }
     }
-    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
+    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
       $jobstep[$jobstepidx]->{tempfail} = 1;
       if (defined($job_slot_index)) {
         $slot[$job_slot_index]->{node}->{fail_count}++;
@@ -2177,8 +2177,22 @@ sub retry_op {
   # that can be retried, the second function will be called with
   # the current try count (0-based), next try time, and error message.
   my $operation = shift;
-  my $retry_callback = shift;
+  my $op_text = shift;
   my $retries = retry_count();
+  my $retry_callback = sub {
+    my ($try_count, $next_try_at, $errmsg) = @_;
+    $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
+    $errmsg =~ s/\s/ /g;
+    $errmsg =~ s/\s+$//;
+    my $retry_msg;
+    if ($next_try_at < time) {
+      $retry_msg = "Retrying.";
+    } else {
+      my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
+      $retry_msg = "Retrying at $next_try_fmt.";
+    }
+    Log(undef, "$op_text failed: $errmsg. $retry_msg");
+  };
   foreach my $try_count (0..$retries) {
     my $next_try = time + (2 ** $try_count);
     my $result = eval { $operation->(@_); };
@@ -2201,25 +2215,11 @@ sub api_call {
   # This function will call that method, retrying as needed until
   # the current retry_count is exhausted, with a log on the first failure.
   my $method_name = shift;
-  my $log_api_retry = sub {
-    my ($try_count, $next_try_at, $errmsg) = @_;
-    $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
-    $errmsg =~ s/\s/ /g;
-    $errmsg =~ s/\s+$//;
-    my $retry_msg;
-    if ($next_try_at < time) {
-      $retry_msg = "Retrying.";
-    } else {
-      my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
-      $retry_msg = "Retrying at $next_try_fmt.";
-    }
-    Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
-  };
   my $method = $arv;
   foreach my $key (split(/\//, $method_name)) {
     $method = $method->{$key};
   }
-  return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
+  return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
 }
 
 sub exit_status_s {
index 4ab65d9d8774708613787b3a694f64bf876004da..769a63bce3f56763e7fa1767317d5af9828a03d0 100644 (file)
@@ -363,6 +363,9 @@ class RunnerContainer(Runner):
         if self.arvrunner.trash_intermediate:
             command.append("--trash-intermediate")
 
+        if self.arvrunner.project_uuid:
+            command.append("--project-uuid="+self.arvrunner.project_uuid)
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
index 35a068f91be7b28b7eebd0f32538e9222367d8a1..b667dac1ca5cec6f272c390be8fcd17e1628764c 100644 (file)
@@ -15,7 +15,7 @@ class ArvadosCommandTool(CommandLineTool):
         self.arvrunner = arvrunner
         self.work_api = kwargs["work_api"]
 
-    def makeJobRunner(self, use_container=True):
+    def makeJobRunner(self, **kwargs):
         if self.work_api == "containers":
             return ArvadosContainer(self.arvrunner)
         elif self.work_api == "jobs":
index 572519fdcf7e6c45218e8e2352366a90eb87564c..56a553eab9d45393edcfad14fb846942c04f77d3 100644 (file)
@@ -51,7 +51,7 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20170727112954',
+          'cwltool==1.0.20170811195303',
           'schema-salad==2.6.20170712194300',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',
index 8ab0a8de9c2448e999475b7ac8735dcedc928099..49545a83dc7ac34eea9acc11dc3e022f2839f22c 100644 (file)
@@ -860,6 +860,31 @@ class TestSubmit(unittest.TestCase):
                          stubs.expect_container_request_uuid + '\n')
 
 
+    @stubs
+    def test_submit_container_project(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--project-uuid="+project_uuid,
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["owner_uuid"] = project_uuid
+        expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                                       '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid,
+                                       '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+
     @stubs
     def test_submit_job_runner_image(self, stubs):
         capture_stdout = cStringIO.StringIO()
index 191f80cbcb11ff9ac44c8e08f37df13e375e4243..2a9cea5802e3437df58dbea37da8cde9f718ec88 100644 (file)
@@ -6,6 +6,7 @@
   "$graph": [
     {
       "class": "Workflow",
+      "cwlVersion": "v1.0",
       "hints": [],
       "id": "#main",
       "inputs": [
index 7d39d678f8ec02c52f6446461c4de9f8e95c142a..7e588be17bb16c04cdbd6098b8dbff8f7c599d18 100644 (file)
@@ -31,7 +31,7 @@ type Mount struct {
        Path              string      `json:"path"`
        Content           interface{} `json:"content"`
        ExcludeFromOutput bool        `json:"exclude_from_output"`
-       Capacity          int64       `json:capacity`
+       Capacity          int64       `json:"capacity"`
 }
 
 // RuntimeConstraints specify a container's compute resources (RAM,
index 29eebdbf729d557a88e121b582cdd78171e31bdd..773a2e6f9c7d787406511f85a6a5585596153738 100644 (file)
@@ -21,7 +21,7 @@ type TransactionError struct {
 }
 
 func (e TransactionError) Error() (s string) {
-       s = fmt.Sprintf("request failed: %s", e.URL)
+       s = fmt.Sprintf("request failed: %s", e.URL.String())
        if e.Status != "" {
                s = s + ": " + e.Status
        }
index f19511ea70791dbd17a201825bfaadc00515082c..81b9587c36aca32d9495307af2ede17112b525aa 100644 (file)
@@ -44,7 +44,7 @@ type Handler struct {
        Routes Routes
 
        // If non-nil, Log is called after handling each request. The
-       // error argument is nil if the request was succesfully
+       // error argument is nil if the request was successfully
        // authenticated and served, even if the health check itself
        // failed.
        Log func(*http.Request, error)
index 03d542805161481f4023c7c9084a2e0cce3c7b16..4523f7d6b3ac38561e17c50b889201166f22baad 100644 (file)
@@ -46,9 +46,12 @@ sub process_request
     $self->{'req'} = new HTTP::Request (%req);
     $self->{'req'}->header('Authorization' => ('OAuth2 ' . $self->{'authToken'})) if $self->{'authToken'};
     $self->{'req'}->header('Accept' => 'application/json');
+
+    # allow_nonref lets us encode JSON::true and JSON::false, see #12078
+    my $json = JSON->new->allow_nonref;
     my ($p, $v);
     while (($p, $v) = each %{$self->{'queryParams'}}) {
-        $content{$p} = (ref($v) eq "") ? $v : JSON::encode_json($v);
+        $content{$p} = (ref($v) eq "") ? $v : $json->encode($v);
     }
     my $content;
     while (($p, $v) = each %content) {
index afd9bdcd89d07b2b4447b059a33b49ccbe483a6c..ec4ae8fb6f971a8d19307a58311b2dec8eed70bc 100644 (file)
@@ -710,6 +710,7 @@ class ArvPutUploadJob(object):
             elif file_in_local_collection.permission_expired():
                 # Permission token expired, re-upload file. This will change whenever
                 # we have a API for refreshing tokens.
+                self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
                 should_upload = True
                 self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
index ce6618132d011056c13580c9497abf4072cc7dc3..346167846cd5ed185453ae85553a1676718e53d6 100644 (file)
@@ -9,21 +9,23 @@ standard_library.install_aliases()
 from builtins import str
 from builtins import range
 import apiclient
+import datetime
+import hashlib
+import json
 import mock
 import os
 import pwd
+import random
 import re
 import shutil
 import subprocess
 import sys
 import tempfile
+import threading
 import time
 import unittest
-import yaml
-import threading
-import hashlib
-import random
 import uuid
+import yaml
 
 import arvados
 import arvados.commands.put as arv_put
@@ -727,6 +729,9 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         cls.ENVIRON = os.environ.copy()
         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
 
+    def datetime_to_hex(self, dt):
+        return hex(int(time.mktime(dt.timetuple())))[2:]
+
     def setUp(self):
         super(ArvPutIntegrationTest, self).setUp()
         arv_put.api_client = None
@@ -840,6 +845,49 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         self.assertEqual(1, len(collection_list))
         return collection_list[0]
 
+    def test_expired_token_invalidates_cache(self):
+        self.authorize_with('active')
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+            f.write('foo')
+        # Upload a directory and get the cache file name
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+        self.assertEqual(p.returncode, 0)
+        cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+                                   err.decode()).groups()[0]
+        self.assertTrue(os.path.isfile(cache_filepath))
+        # Load the cache file contents and modify the manifest to simulate
+        # an expired access token
+        with open(cache_filepath, 'r') as c:
+            cache = json.load(c)
+        self.assertRegex(cache['manifest'], r'\+A\S+\@')
+        a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+        cache['manifest'] = re.sub(
+            r'\@.*? ',
+            "@{} ".format(self.datetime_to_hex(a_month_ago)),
+            cache['manifest'])
+        with open(cache_filepath, 'w') as c:
+            c.write(json.dumps(cache))
+        # Re-run the upload and expect to get an invalid cache message
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(
+            err.decode(),
+            r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
+        self.assertEqual(p.returncode, 0)
+        # Confirm that the resulting cache is different from the last run.
+        with open(cache_filepath, 'r') as c2:
+            new_cache = json.load(c2)
+        self.assertNotEqual(cache['manifest'], new_cache['manifest'])
+
     def test_put_collection_with_later_update(self):
         tmpdir = self.make_tmpdir()
         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
index 3a66f5328d969d5f56cc02865b6440fbf08cdff8..a3d8f082adf82c3330b034ba0dde581d0ca493b0 100644 (file)
@@ -87,9 +87,6 @@ class ApplicationController < ActionController::Base
   end
 
   def index
-    if @select.nil? || @select.include?("id")
-      @objects = @objects.uniq(&:id)
-    end
     if params[:eager] and params[:eager] != '0' and params[:eager] != 0 and params[:eager] != ''
       @objects.each(&:eager_load_associations)
     end
index 16df210fe83cc12096549f5609fafeecdcc09340..67d46f6716b2ce22b09cb6b1204b08fdb3c3dd96 100644 (file)
@@ -149,6 +149,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                // SSL certificates. See
                // http://www.w3.org/TR/cors/#user-credentials).
                w.Header().Set("Access-Control-Allow-Origin", "*")
+               w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
        }
 
        arv := h.clientPool.Get()
index 0fe773a59e278b93264bf1d63457a14d9b709ef8..0ab3e969a0ebaa3f0d007e0c764a1e7507f6ec8a 100644 (file)
@@ -133,7 +133,7 @@ func init() {
                &s3UnsafeDelete,
                "s3-unsafe-delete",
                false,
-               "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
+               "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
 }
 
 // S3Volume implements Volume using an S3 bucket.
index 988b83c142b15d6a62058ec05ee0863d03476503..565db6601f18e68f5f621e0838ee06e051038028 100644 (file)
@@ -82,17 +82,20 @@ class BaseNodeManagerActor(pykka.ThreadingActor):
     def __init__(self, *args, **kwargs):
          super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
          self.actor_ref = TellableActorRef(self)
+         self._killfunc = kwargs.get("killfunc", os.kill)
 
     def on_failure(self, exception_type, exception_value, tb):
         lg = getattr(self, "_logger", logging)
         if (exception_type in (threading.ThreadError, MemoryError) or
             exception_type is OSError and exception_value.errno == errno.ENOMEM):
             lg.critical("Unhandled exception is a fatal error, killing Node Manager")
-            os.kill(os.getpid(), signal.SIGKILL)
+            self._killfunc(os.getpid(), signal.SIGKILL)
 
     def ping(self):
         return True
 
+    def get_thread(self):
+        return threading.current_thread()
 
 class WatchdogActor(pykka.ThreadingActor):
     def __init__(self, timeout, *args, **kwargs):
@@ -101,12 +104,13 @@ class WatchdogActor(pykka.ThreadingActor):
          self.actors = [a.proxy() for a in args]
          self.actor_ref = TellableActorRef(self)
          self._later = self.actor_ref.tell_proxy()
+         self._killfunc = kwargs.get("killfunc", os.kill)
 
     def kill_self(self, e, act):
         lg = getattr(self, "_logger", logging)
         lg.critical("Watchdog exception", exc_info=e)
         lg.critical("Actor %s watchdog ping time out, killing Node Manager", act)
-        os.kill(os.getpid(), signal.SIGKILL)
+        self._killfunc(os.getpid(), signal.SIGKILL)
 
     def on_start(self):
         self._later.run()
index fb9a6bf2142d58a63e0eba8d993e4fa2cb5e8ddb..c5dd1adef1f3173446d7c5efb3d8fbfc31d9d771 100644 (file)
@@ -240,6 +240,9 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         return super(ComputeNodeShutdownActor, self)._finished()
 
     def cancel_shutdown(self, reason, **kwargs):
+        if self.cancel_reason is not None:
+            # already cancelled
+            return
         self.cancel_reason = reason
         self._logger.info("Shutdown cancelled: %s.", reason)
         self._finished(success_flag=False)
@@ -257,6 +260,9 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     @_cancel_on_exception
     def shutdown_node(self):
+        if self.cancel_reason is not None:
+            # already cancelled
+            return
         if self.cancellable:
             self._logger.info("Checking that node is still eligible for shutdown")
             eligible, reason = self._monitor.shutdown_eligible().get()
index fa56578cffa1108526584ded9730a9cb5ffbbda9..c8883c3ae70f6614b7bd9063030c14e264dfe543 100644 (file)
@@ -73,7 +73,7 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
         if output in ("drng\n", "alloc\n", "drng*\n", "alloc*\n"):
             self._timer.schedule(time.time() + 10,
                                  self._later.await_slurm_drain)
-        elif output in ("idle\n"):
+        elif output in ("idle\n",):
             # Not in "drng" but idle, don't shut down
             self.cancel_shutdown("slurm state is %s" % output.strip(), try_resume=False)
         else:
index 4d2a1394df2b69b3823b8120524e06af7bcb7cb5..e7e3f25fe383239c310f72e2bd234cefc46f9619 100644 (file)
@@ -19,11 +19,15 @@ class TimedCallBackActor(actor_class):
     message at a later time.  This actor runs the necessary event loop for
     delivery.
     """
-    def __init__(self, max_sleep=1):
+    def __init__(self, max_sleep=1, timefunc=None):
         super(TimedCallBackActor, self).__init__()
         self._proxy = self.actor_ref.tell_proxy()
         self.messages = []
         self.max_sleep = max_sleep
+        if timefunc is None:
+            self._timefunc = time.time
+        else:
+            self._timefunc = timefunc
 
     def schedule(self, delivery_time, receiver, *args, **kwargs):
         if not self.messages:
@@ -33,7 +37,7 @@ class TimedCallBackActor(actor_class):
     def deliver(self):
         if not self.messages:
             return
-        til_next = self.messages[0][0] - time.time()
+        til_next = self.messages[0][0] - self._timefunc()
         if til_next <= 0:
             t, receiver, args, kwargs = heapq.heappop(self.messages)
             try:
index 59d95e3d22f1231030359141298d79f2a547ad8b..d083bf168b50a54087ad398d925e007eab972713 100644 (file)
@@ -33,7 +33,7 @@ setup(name='arvados-node-manager',
       ],
       install_requires=[
           'apache-libcloud>=0.20',
-          'arvados-python-client>=0.1.20150206225333',
+          'arvados-python-client>=0.1.20170731145219',
           'future',
           'pykka',
           'python-daemon',
diff --git a/services/nodemanager/tests/stress_test.cwl b/services/nodemanager/tests/stress_test.cwl
new file mode 100644 (file)
index 0000000..082df64
--- /dev/null
@@ -0,0 +1,51 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+#
+#
+# Usage: arvados-cwl-runner stress_test.cwl
+#
+# Submits 100 jobs or containers, creating load on node manager and
+# scheduler.
+
+class: Workflow
+cwlVersion: v1.0
+requirements:
+  ScatterFeatureRequirement: {}
+  InlineJavascriptRequirement: {}
+inputs: []
+outputs: []
+steps:
+  step1:
+    in: []
+    out: [out]
+    run:
+      class: ExpressionTool
+      inputs: []
+      outputs:
+        out: int[]
+      expression: |
+        ${
+          var r = [];
+          for (var i = 1; i <= 100; i++) {
+            r.push(i);
+          }
+          return {out: r};
+        }
+  step2:
+    in:
+      num: step1/out
+    out: []
+    scatter: num
+    run:
+      class: CommandLineTool
+      requirements:
+        ShellCommandRequirement: {}
+      inputs:
+        num: int
+      outputs: []
+      arguments: [echo, "starting",
+        {shellQuote: false, valueFrom: "&&"},
+        sleep, $((101-inputs.num)*2),
+        {shellQuote: false, valueFrom: "&&"},
+        echo, "the number of the day is", $(inputs.num)]
index a8aa2e38fb46ce2c3e3b0d2ae7f35f01b12e4952..c44305d2b96a66a4cf6ddedf45556f3c58085532 100644 (file)
@@ -100,6 +100,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
             ]
         self.make_actor()
         self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.setup_actor.ping().get(self.TIMEOUT)
         self.assertEqual(1, self.cloud_client.post_create_node.call_count)
 
     def test_instance_exceeded_not_retried(self):
@@ -151,6 +152,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         self.api_client.nodes().create().execute.side_effect = retry_resp
         self.api_client.nodes().update().execute.side_effect = retry_resp
         self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.setup_actor.ping().get(self.TIMEOUT)
         self.assertEqual(self.setup_actor.actor_ref.actor_urn,
                          subscriber.call_args[0][0].actor_ref.actor_urn)
 
@@ -207,17 +209,19 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
         self.cloud_client.destroy_node.return_value = True
         self.make_actor(cancellable=True)
-        self.check_success_flag(False)
+        self.check_success_flag(False, 2)
         self.assertFalse(self.cloud_client.destroy_node.called)
 
     def test_uncancellable_shutdown(self, *mocks):
         self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
         self.cloud_client.destroy_node.return_value = True
         self.make_actor(cancellable=False)
-        self.check_success_flag(True, 2)
+        self.check_success_flag(True, 4)
         self.assertTrue(self.cloud_client.destroy_node.called)
 
     def test_arvados_node_cleaned_after_shutdown(self, *mocks):
+        if len(mocks) == 1:
+            mocks[0].return_value = "drain\n"
         cloud_node = testutil.cloud_node_mock(62)
         arv_node = testutil.arvados_node_mock(62)
         self.make_mocks(cloud_node, arv_node)
@@ -235,12 +239,15 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         self.assertTrue(update_mock().execute.called)
 
     def test_arvados_node_not_cleaned_after_shutdown_cancelled(self, *mocks):
+        if len(mocks) == 1:
+            mocks[0].return_value = "idle\n"
         cloud_node = testutil.cloud_node_mock(61)
         arv_node = testutil.arvados_node_mock(61)
         self.make_mocks(cloud_node, arv_node, shutdown_open=False)
         self.cloud_client.destroy_node.return_value = False
         self.make_actor(cancellable=True)
         self.shutdown_actor.cancel_shutdown("test")
+        self.shutdown_actor.ping().get(self.TIMEOUT)
         self.check_success_flag(False, 2)
         self.assertFalse(self.arvados_client.nodes().update.called)
 
@@ -338,13 +345,11 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
     def test_in_state_when_no_state_available(self):
         self.make_actor(arv_node=testutil.arvados_node_mock(
                 crunch_worker_state=None))
-        print(self.node_actor.get_state().get())
         self.assertTrue(self.node_state('idle'))
 
     def test_in_state_when_no_state_available_old(self):
         self.make_actor(arv_node=testutil.arvados_node_mock(
                 crunch_worker_state=None, age=90000))
-        print(self.node_actor.get_state().get())
         self.assertTrue(self.node_state('down'))
 
     def test_in_idle_state(self):
index c7eb7afc631cd2b06e2c7e6753b2e696ab27b520..0b6162dfaa64405df53794bb575bfffd2420bbff 100644 (file)
@@ -32,13 +32,20 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
             self.timer = testutil.MockTimer(False)
         self.make_actor()
         self.check_success_flag(None, 0)
+        # At this point, 1st try should have happened.
+
         self.timer.deliver()
         self.check_success_flag(None, 0)
-        self.timer.deliver()
+        # At this point, 2nd try should have happened.
+
         # Order is critical here: if the mock gets called when no return value
         # or side effect is set, we may invoke a real subprocess.
         proc_mock.return_value = end_state
         proc_mock.side_effect = None
+
+        # 3rd try
+        self.timer.deliver()
+
         self.check_success_flag(True, 3)
         self.check_slurm_got_args(proc_mock, 'NodeName=compute63')
 
@@ -67,20 +74,18 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         self.check_success_flag(True)
         self.assertFalse(proc_mock.called)
 
-    def test_node_undrained_when_shutdown_cancelled(self, proc_mock):
+    def test_node_resumed_when_shutdown_cancelled(self, proc_mock):
         try:
             proc_mock.side_effect = iter(['', 'drng\n', 'drng\n', ''])
             self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
             self.timer = testutil.MockTimer(False)
             self.make_actor()
             self.busywait(lambda: proc_mock.call_args is not None)
-            self.shutdown_actor.cancel_shutdown("test").get(self.TIMEOUT)
+            self.shutdown_actor.cancel_shutdown("test")
             self.check_success_flag(False, 2)
-            self.assertEqual(proc_mock.call_args_list,
-                             [mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']),
-                              mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
-                              mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
-                              mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME'])])
+            self.assertEqual(proc_mock.call_args_list[0], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']))
+            self.assertEqual(proc_mock.call_args_list[-1], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME']))
+
         finally:
             self.shutdown_actor.actor_ref.stop()
 
@@ -88,10 +93,10 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n', 'idle\n'])
         self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
         self.make_actor()
-        self.check_success_flag(False, 2)
+        self.check_success_flag(False, 5)
 
     def test_issue_slurm_drain_retry(self, proc_mock):
-        proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
+        proc_mock.side_effect = iter([OSError, OSError, 'drng\n', 'drain\n'])
         self.check_success_after_reset(proc_mock, timer=False)
 
     def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
index f714c3c8b38761c0f38e46b70d7a12fd5124bc80..1efa1ffeb35199c251d13e217f2cb37c146c4622 100644 (file)
@@ -21,6 +21,15 @@ import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
+
+    def busywait(self, f):
+        n = 0
+        while not f() and n < 200:
+            time.sleep(.1)
+            self.daemon.ping().get(self.TIMEOUT)
+            n += 1
+        self.assertTrue(f())
+
     def mock_node_start(self, **kwargs):
         # Make sure that every time the daemon starts a setup actor,
         # it gets a new mock object back.
@@ -102,14 +111,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
 
     def monitor_list(self):
-        return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
+        return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
 
-    def monitored_arvados_nodes(self):
+    def monitored_arvados_nodes(self, include_unpaired=True):
         pairings = []
         for future in [actor.proxy().arvados_node
                        for actor in self.monitor_list()]:
             try:
-                pairings.append(future.get(self.TIMEOUT))
+                g = future.get(self.TIMEOUT)
+                if g or include_unpaired:
+                    pairings.append(g)
             except pykka.ActorDeadError:
                 pass
         return pairings
@@ -117,6 +128,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def alive_monitor_count(self):
         return len(self.monitored_arvados_nodes())
 
+    def paired_monitor_count(self):
+        return len(self.monitored_arvados_nodes(False))
+
     def assertShutdownCancellable(self, expected=True):
         self.assertTrue(self.node_shutdown.start.called)
         self.assertIs(expected,
@@ -126,17 +140,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_easy_node_creation(self):
         size = testutil.MockSize(1)
         self.make_daemon(want_sizes=[size])
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: self.node_setup.start.called)
 
     def check_monitors_arvados_nodes(self, *arv_nodes):
+        self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
 
     def test_node_pairing(self):
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon([cloud_node], [arv_node])
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_node_pairing_after_arvados_update(self):
@@ -145,7 +158,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          [testutil.arvados_node_mock(1, ip_address=None)])
         arv_node = testutil.arvados_node_mock(2)
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_arvados_node_un_and_re_paired(self):
@@ -157,9 +169,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
         self.check_monitors_arvados_nodes(arv_node)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.assertEqual(0, self.alive_monitor_count())
+        self.busywait(lambda: 0 == self.alive_monitor_count())
         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_old_arvados_node_not_double_assigned(self):
@@ -179,8 +190,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_node_count_satisfied(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
                          want_sizes=[testutil.MockSize(1)])
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_dont_count_missing_as_busy(self):
         size = testutil.MockSize(1)
@@ -191,8 +201,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                             2,
                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size, size])
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: 2 == self.alive_monitor_count())
+        self.busywait(lambda: self.node_setup.start.called)
 
     def test_missing_counts_towards_max(self):
         size = testutil.MockSize(1)
@@ -202,8 +212,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size, size],
                          max_nodes=2)
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_excess_counts_missing(self):
         size = testutil.MockSize(1)
@@ -212,7 +221,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -224,7 +233,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         get_cloud_node = mock.MagicMock(name="get_cloud_node")
         get_cloud_node.get.return_value = cloud_nodes[1]
         mock_node_monitor = mock.MagicMock()
@@ -233,10 +242,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
 
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.alive_monitor_count())
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
-        self.assertEqual(1, self.node_shutdown.start.call_count)
+        self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
 
     def test_booting_nodes_counted(self):
         cloud_node = testutil.cloud_node_mock(1)
@@ -246,17 +255,15 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.max_nodes.get(self.TIMEOUT)
         self.assertTrue(self.node_setup.start.called)
         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
 
     def test_boot_new_node_when_all_nodes_busy(self):
         size = testutil.MockSize(2)
         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
                          [size], avail_sizes=[(size, {"cores":1})])
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         self.busywait(lambda: self.node_setup.start.called)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
 
     def test_boot_new_node_below_min_nodes(self):
         min_size = testutil.MockSize(1)
@@ -402,7 +409,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         now = time.time()
         self.monitor_list()[0].tell_proxy().consider_shutdown()
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_booted_node_shut_down_when_never_paired(self):
@@ -414,7 +420,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_cloud_nodes([cloud_node])
         self.monitor_list()[0].tell_proxy().consider_shutdown()
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_booted_node_shut_down_when_never_working(self):
@@ -427,7 +432,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
         self.daemon.update_cloud_nodes([cloud_node])
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_node_that_pairs_not_considered_failed_boot(self):
@@ -457,8 +461,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_booting_nodes_shut_down(self):
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
+        self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
 
     def test_all_booting_nodes_tried_to_shut_down(self):
         size = testutil.MockSize(2)
@@ -483,7 +486,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(1)
         size = testutil.MockSize(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -493,7 +496,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -501,11 +504,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_shutdown_accepted_below_capacity(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_shutdown.start.called)
+        self.busywait(lambda: self.node_shutdown.start.called)
 
     def test_shutdown_declined_when_idle_and_job_queued(self):
         size = testutil.MockSize(1)
@@ -513,7 +515,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
                      testutil.arvados_node_mock(4, job_uuid=None)]
         self.make_daemon(cloud_nodes, arv_nodes, [size])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         for mon_ref in self.monitor_list():
             monitor = mon_ref.proxy()
             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
@@ -532,13 +534,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = False
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
 
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = True
         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.assertEqual(0, self.alive_monitor_count())
+        self.busywait(lambda: 0 == self.paired_monitor_count())
 
     def test_nodes_shutting_down_replaced_below_max_nodes(self):
         size = testutil.MockSize(6)
@@ -551,21 +553,19 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_shutdown.start.called)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(6)]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: self.node_setup.start.called)
 
     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
         cloud_node = testutil.cloud_node_mock(7)
         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
                          max_nodes=1)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.assertTrue(self.node_shutdown.start.called)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(7)]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_nodes_shutting_down_count_against_excess(self):
         size = testutil.MockSize(8)
@@ -573,7 +573,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
         self.make_daemon(cloud_nodes, arv_nodes, [size],
                          avail_sizes=[(size, {"cores":1})])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -598,8 +598,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         size = testutil.MockSize(2)
         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
         self.timer.deliver()
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
 
     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
@@ -607,9 +606,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(
-            1, self.last_shutdown.stop.call_count)
+        self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
 
     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
@@ -620,8 +617,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         # the ActorDeadError.
         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.last_shutdown.stop.call_count)
+        self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
 
     def test_node_create_two_sizes(self):
         small = testutil.MockSize(1)
@@ -675,7 +671,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(3)],
                          want_sizes=[small, small, big],
                          avail_sizes=avail_sizes)
-
+        self.busywait(lambda: 3 == self.paired_monitor_count())
         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
 
         self.assertEqual(0, self.node_shutdown.start.call_count)
@@ -686,10 +682,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         booting = self.daemon.booting.get()
         cloud_nodes = self.daemon.cloud_nodes.get()
 
-        self.stop_proxy(self.daemon)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
 
-        self.assertEqual(1, self.node_setup.start.call_count)
-        self.assertEqual(1, self.node_shutdown.start.call_count)
+        self.stop_proxy(self.daemon)
 
         # booting a new big node
         sizecounts = {a[0].id: 0 for a in avail_sizes}
index cfac61ba2eaf64c67cf44aeb298f2caf4d9b1f86..ef4423dafaf7762b5d8a8c95fdbaacf630156917 100644 (file)
@@ -19,8 +19,8 @@ from . import testutil
 import arvnodeman.baseactor
 
 class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
-    def __init__(self, e):
-        super(BogusActor, self).__init__()
+    def __init__(self, e, killfunc=None):
+        super(BogusActor, self).__init__(killfunc=killfunc)
         self.exp = e
 
     def doStuff(self):
@@ -29,30 +29,35 @@ class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
     def ping(self):
         # Called by WatchdogActorTest, this delay is longer than the test timeout
         # of 1 second, which should cause the watchdog ping to fail.
-        time.sleep(4)
+        time.sleep(2)
         return True
 
 class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
     def test_fatal_error(self):
         for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
-            with mock.patch('os.kill') as kill_mock:
-                act = BogusActor.start(e).tell_proxy()
-                act.doStuff()
-                act.actor_ref.stop(block=True)
-                self.assertTrue(kill_mock.called)
-
-    @mock.patch('os.kill')
-    def test_nonfatal_error(self, kill_mock):
-        act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+            kill_mock = mock.Mock('os.kill')
+            bgact = BogusActor.start(e, killfunc=kill_mock)
+            act_thread = bgact.proxy().get_thread().get()
+            act = bgact.tell_proxy()
+            act.doStuff()
+            act.actor_ref.stop(block=True)
+            act_thread.join()
+            self.assertTrue(kill_mock.called)
+
+    def test_nonfatal_error(self):
+        kill_mock = mock.Mock('os.kill')
+        act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
         act.doStuff()
         act.actor_ref.stop(block=True)
         self.assertFalse(kill_mock.called)
 
 class WatchdogActorTest(testutil.ActorTestMixin, unittest.TestCase):
-    @mock.patch('os.kill')
-    def test_time_timout(self, kill_mock):
+
+    def test_time_timout(self):
+        kill_mock = mock.Mock('os.kill')
         act = BogusActor.start(OSError(errno.ENOENT, ""))
-        watch = arvnodeman.baseactor.WatchdogActor.start(1, act)
+        watch = arvnodeman.baseactor.WatchdogActor.start(1, act, killfunc=kill_mock)
+        time.sleep(1)
         watch.stop(block=True)
         act.stop(block=True)
         self.assertTrue(kill_mock.called)
index 669b6247114c0f4843f5c2dd51eb9f9d4c00d4a9..b1d5e002767a000d7487aa82c8ee5bb9c312e320 100644 (file)
@@ -157,7 +157,6 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
     @mock.patch("subprocess.check_call")
     @mock.patch("subprocess.check_output")
     def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
-        #mock_scancel.return_value = ""
         job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
         container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
         mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
@@ -165,6 +164,7 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
         self.build_monitor([{'items': [{'uuid': job_uuid}]}],
                            self.MockCalculatorUnsatisfiableJobs(), True, True)
         self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+        self.monitor.ping().get(self.TIMEOUT)
         self.stop_proxy(self.monitor)
         self.client.jobs().cancel.assert_called_with(uuid=job_uuid)
         mock_scancel.assert_called_with(['scancel', '--name='+container_uuid])
index cee7fe1c335247b6aba73df9abed3362b53c402c..21a9b5ac778651c58084b31001bda8c56a9ef9ed 100644 (file)
@@ -26,27 +26,29 @@ class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
 
     def test_delayed_turnaround(self):
         receiver = mock.Mock()
-        with mock.patch('time.time', return_value=0) as mock_now:
-            deliverer = timedcallback.TimedCallBackActor.start().proxy()
-            deliverer.schedule(1, receiver, 'delayed')
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            self.assertFalse(receiver.called)
-            mock_now.return_value = 2
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            self.stop_proxy(deliverer)
+        mock_now = mock.Mock()
+        mock_now.return_value = 0
+        deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+        deliverer.schedule(1, receiver, 'delayed')
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        self.assertFalse(receiver.called)
+        mock_now.return_value = 2
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        self.stop_proxy(deliverer)
         receiver.assert_called_with('delayed')
 
     def test_out_of_order_scheduling(self):
         receiver = mock.Mock()
-        with mock.patch('time.time', return_value=1.5) as mock_now:
-            deliverer = timedcallback.TimedCallBackActor.start().proxy()
-            deliverer.schedule(2, receiver, 'second')
-            deliverer.schedule(1, receiver, 'first')
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            receiver.assert_called_with('first')
-            mock_now.return_value = 2.5
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            self.stop_proxy(deliverer)
+        mock_now = mock.Mock()
+        mock_now.return_value = 1.5
+        deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+        deliverer.schedule(2, receiver, 'second')
+        deliverer.schedule(1, receiver, 'first')
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        receiver.assert_called_with('first')
+        mock_now.return_value = 2.5
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        self.stop_proxy(deliverer)
         receiver.assert_called_with('second')
 
     def test_dead_actors_ignored(self):
@@ -61,4 +63,3 @@ class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
 
 if __name__ == '__main__':
     unittest.main()
-
index 0a483709adf5b69a8fd13647f9e9bd866fa836cf..6e134375bb8aec05bdd71f830e28f277d3cff5b5 100644 (file)
@@ -123,7 +123,10 @@ class ActorTestMixin(object):
         pykka.ActorRegistry.stop_all()
 
     def stop_proxy(self, proxy):
-        return proxy.actor_ref.stop(timeout=self.TIMEOUT)
+        th = proxy.get_thread().get()
+        t = proxy.actor_ref.stop(timeout=self.TIMEOUT)
+        th.join()
+        return t
 
     def wait_for_assignment(self, proxy, attr_name, unassigned=None,
                             timeout=TIMEOUT):
@@ -136,11 +139,13 @@ class ActorTestMixin(object):
             if result is not unassigned:
                 return result
 
-    def busywait(self, f):
+    def busywait(self, f, finalize=None):
         n = 0
-        while not f() and n < 10:
+        while not f() and n < 20:
             time.sleep(.1)
             n += 1
+        if finalize is not None:
+            finalize()
         self.assertTrue(f())
 
 
index edeb647e4628e675be696cb68f4b61892b4cc606..cfb828b2a5d84c6d16407866374e1f4900185f84 100644 (file)
@@ -248,7 +248,8 @@ func (ps *pgEventSource) DB() *sql.DB {
 }
 
 func (ps *pgEventSource) DBHealth() error {
-       ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       defer cancel()
        var i int
        return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
 }
index f9f7f53edc58430f231e9a52d5d95bb1a025084a..d527c39ba1c4eeb12c0cbae63526150da27f096d 100644 (file)
@@ -60,6 +60,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // Receive websocket frames from the client and pass them to
        // sess.Receive().
        go func() {
+               defer cancel()
                buf := make([]byte, 2<<20)
                for {
                        select {
@@ -75,16 +76,14 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                err = errFrameTooBig
                        }
                        if err != nil {
-                               if err != io.EOF {
+                               if err != io.EOF && ctx.Err() == nil {
                                        log.WithError(err).Info("read error")
                                }
-                               cancel()
                                return
                        }
                        err = sess.Receive(buf)
                        if err != nil {
                                log.WithError(err).Error("sess.Receive() failed")
-                               cancel()
                                return
                        }
                }
@@ -94,6 +93,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // sess.EventMessage() as needed, and send them to the client
        // as websocket frames.
        go func() {
+               defer cancel()
                for {
                        var ok bool
                        var data interface{}
@@ -119,8 +119,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                buf, err = sess.EventMessage(e)
                                if err != nil {
                                        log.WithError(err).Error("EventMessage failed")
-                                       cancel()
-                                       break
+                                       return
                                } else if len(buf) == 0 {
                                        log.Debug("skip")
                                        continue
@@ -135,9 +134,10 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                        t0 := time.Now()
                        _, err = ws.Write(buf)
                        if err != nil {
-                               log.WithError(err).Error("write failed")
-                               cancel()
-                               break
+                               if ctx.Err() == nil {
+                                       log.WithError(err).Error("write failed")
+                               }
+                               return
                        }
                        log.Debug("sent")
 
@@ -159,6 +159,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // is done/cancelled or the incoming event stream ends. Shut
        // down the handler if the outgoing queue fills up.
        go func() {
+               defer cancel()
                ticker := time.NewTicker(h.PingTimeout)
                defer ticker.Stop()
 
@@ -178,10 +179,8 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                        default:
                                        }
                                }
-                               continue
                        case e, ok := <-incoming.Channel():
                                if !ok {
-                                       cancel()
                                        return
                                }
                                if !sess.Filter(e) {
@@ -191,7 +190,6 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                case queue <- e:
                                default:
                                        log.WithError(errQueueFull).Error("terminate")
-                                       cancel()
                                        return
                                }
                        }
index 58c64231cb53c1204ceed70b0ea030a7050ebb95..4fbfc489cf30fe0e56425e37d909c250f83d967d 100644 (file)
@@ -205,6 +205,10 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        // client will probably reconnect and do the
                        // same thing all over again.
                        time.Sleep(100 * time.Millisecond)
+                       if sess.ws.Request().Context().Err() != nil {
+                               // Session terminated while we were sleeping
+                               return
+                       }
                }
                now := time.Now()
                e := &event{
index b21e49e3539a3f8af1cbfe4a0d8ff0709c96f1fc..db9b64887fe325fd25fed9d452acf449524f9a1e 100644 (file)
@@ -65,14 +65,20 @@ RUN cd /root && \
     GOPATH=$PWD go get github.com/curoverse/runsvinit && \
     install bin/runsvinit /usr/local/bin
 
+ENV PJSVERSION=1.9.7
+
 RUN set -e && \
- PJS=phantomjs-1.9.7-linux-x86_64 && \
- curl -L -o/tmp/$PJS.tar.bz2 http://cache.arvados.org/$PJS.tar.bz2 && \
- tar -C /usr/local -xjf /tmp/$PJS.tar.bz2 && \
- ln -s ../$PJS/bin/phantomjs /usr/local/bin/
+ curl -L -f http://cache.arvados.org/phantomjs-${PJSVERSION}-linux-x86_64.tar.bz2 | tar -C /usr/local -xjf - && \
+ ln -s ../phantomjs-${PJSVERSION}-linux-x86_64/bin/phantomjs /usr/local/bin
 
 RUN pip install -U setuptools
 
+ENV NODEVERSION v6.11.2
+
+# Install nodejs binary
+RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
+    ln -s ../node-${NODEVERSION}-linux-x64/bin/node ../node-${NODEVERSION}-linux-x64/bin/npm /usr/local/bin
+
 ARG arvados_version
 RUN echo arvados_version is git commit $arvados_version