Merge branch 'master' into 11898-no-distinct
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 29 Aug 2017 05:30:11 +0000 (01:30 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 29 Aug 2017 05:30:11 +0000 (01:30 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

81 files changed:
.gitignore
apps/workbench/app/controllers/collections_controller.rb
apps/workbench/app/controllers/healthcheck_controller.rb
apps/workbench/app/views/jobs/_show_log.html.erb
apps/workbench/config/application.default.yml
apps/workbench/config/application.yml.example
apps/workbench/config/load_config.rb
apps/workbench/test/controllers/collections_controller_test.rb
apps/workbench/test/controllers/healthcheck_controller_test.rb
apps/workbench/test/integration/anonymous_access_test.rb
apps/workbench/test/integration/collections_test.rb
apps/workbench/test/integration/jobs_test.rb
apps/workbench/test/integration_helper.rb
build/build.list
build/run-build-docker-images.sh
build/run-build-packages.sh
build/run-library.sh
build/run-tests.sh
lib/crunchstat/crunchstat.go
lib/crunchstat/crunchstat_test.go
sdk/cli/bin/crunch-job
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/wf/expect_packed.cwl
sdk/cwl/tests/wf/scatter2_subwf.cwl
sdk/go/arvados/container.go
sdk/go/arvados/error.go
sdk/go/health/handler.go
sdk/perl/lib/Arvados/Request.pm
sdk/python/arvados/commands/put.py
sdk/python/arvados/commands/ws.py
sdk/python/arvados/util.py
sdk/python/tests/test_arv_put.py
services/api/Gemfile.lock
services/api/app/controllers/arvados/v1/healthcheck_controller.rb
services/api/config/application.default.yml
services/api/test/functional/arvados/v1/healthcheck_controller_test.rb
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/command.py
services/fuse/arvados_fuse/fresh.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/tests/test_mount.py
services/keep-web/handler.go
services/keepstore/s3_volume.go
services/nodemanager/arvnodeman/baseactor.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/arvnodeman/status.py
services/nodemanager/arvnodeman/timedcallback.py
services/nodemanager/setup.py
services/nodemanager/tests/integration_test.py
services/nodemanager/tests/stress_test.cwl [new file with mode: 0644]
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_dispatch_slurm.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_failure.py
services/nodemanager/tests/test_jobqueue.py
services/nodemanager/tests/test_status.py
services/nodemanager/tests/test_timedcallback.py
services/nodemanager/tests/testutil.py
services/ws/event_source.go
services/ws/handler.go
services/ws/session_v0.go
services/ws/session_v0_test.go
tools/arvbox/lib/arvbox/docker/Dockerfile.base
tools/crunchstat-summary/crunchstat_summary/command.py
tools/crunchstat-summary/crunchstat_summary/reader.py
tools/crunchstat-summary/crunchstat_summary/summarizer.py
tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz [new file with mode: 0644]
tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report [new file with mode: 0644]
tools/crunchstat-summary/tests/test_examples.py
vendor/vendor.json

index 77b98999bca91ede8ff5aab46f04473ba1f2cd99..0e876bb6f4d430eea2a79bbe34b0ee3f37c8a6fc 100644 (file)
@@ -27,3 +27,4 @@ sdk/cwl/arvados_cwl/_version.py
 services/api/config/arvados-clients.yml
 *#*
 .DS_Store
+.vscode
index f8fcf5108f025659bf5058f2861ef42d2e1b5781..5fcb2dc569ff6b2446c602dc26de61a069155ba2 100644 (file)
@@ -115,20 +115,10 @@ class CollectionsController < ApplicationController
   end
 
   def show_file_links
-    if Rails.configuration.keep_web_url || Rails.configuration.keep_web_download_url
-      # show_file will redirect to keep-web's directory listing
-      return show_file
-    end
-    Thread.current[:reader_tokens] = [params[:reader_token]]
-    return if false.equal?(find_object_by_uuid)
-    render layout: false
+    return show_file
   end
 
   def show_file
-    # We pipe from arv-get to send the file to the user.  Before we start it,
-    # we ask the API server if the file actually exists.  This serves two
-    # purposes: it lets us return a useful status code for common errors, and
-    # helps us figure out which token to provide to arv-get.
     # The order of searched tokens is important: because the anonymous user
     # token is passed along with every API request, we have to check it first.
     # Otherwise, it's impossible to know whether any other request succeeded
@@ -145,62 +135,18 @@ class CollectionsController < ApplicationController
       return
     end
 
-    # If we are configured to use a keep-web server, just redirect to
-    # the appropriate URL.
-    if Rails.configuration.keep_web_url or
-        Rails.configuration.keep_web_download_url
-      opts = {}
-      if usable_token == params[:reader_token]
-        opts[:path_token] = usable_token
-      elsif usable_token == Rails.configuration.anonymous_user_token
-        # Don't pass a token at all
-      else
-        # We pass the current user's real token only if it's necessary
-        # to read the collection.
-        opts[:query_token] = usable_token
-      end
-      opts[:disposition] = params[:disposition] if params[:disposition]
-      return redirect_to keep_web_url(params[:uuid], params[:file], opts)
-    end
-
-    # No keep-web server available. Get the file data with arv-get,
-    # and serve it through Rails.
-
-    file_name = params[:file].andand.sub(/^(\.\/|\/|)/, './')
-    if file_name.nil? or not coll.manifest.has_file?(file_name)
-      return render_not_found
-    end
-
-    opts = params.merge(arvados_api_token: usable_token)
-
-    # Handle Range requests. Currently we support only 'bytes=0-....'
-    if request.headers.include? 'HTTP_RANGE'
-      if m = /^bytes=0-(\d+)/.match(request.headers['HTTP_RANGE'])
-        opts[:maxbytes] = m[1]
-        size = params[:size] || '*'
-        self.response.status = 206
-        self.response.headers['Content-Range'] = "bytes 0-#{m[1]}/#{size}"
-      end
-    end
-
-    ext = File.extname(params[:file])
-    self.response.headers['Content-Type'] =
-      Rack::Mime::MIME_TYPES[ext] || 'application/octet-stream'
-    if params[:size]
-      size = params[:size].to_i
-      if opts[:maxbytes]
-        size = [size, opts[:maxbytes].to_i].min
-      end
-      self.response.headers['Content-Length'] = size.to_s
-    end
-    self.response.headers['Content-Disposition'] = params[:disposition] if params[:disposition]
-    begin
-      file_enumerator(opts).each do |bytes|
-        response.stream.write bytes
-      end
-    ensure
-      response.stream.close
+    opts = {}
+    if usable_token == params[:reader_token]
+      opts[:path_token] = usable_token
+    elsif usable_token == Rails.configuration.anonymous_user_token
+      # Don't pass a token at all
+    else
+      # We pass the current user's real token only if it's necessary
+      # to read the collection.
+      opts[:query_token] = usable_token
     end
+    opts[:disposition] = params[:disposition] if params[:disposition]
+    return redirect_to keep_web_url(params[:uuid], params[:file], opts)
   end
 
   def sharing_scopes
@@ -288,11 +234,7 @@ class CollectionsController < ApplicationController
 
   def download_link
     token = @search_sharing.first.api_token
-    if Rails.configuration.keep_web_url || Rails.configuration.keep_web_download_url
-      keep_web_url(@object.uuid, nil, {path_token: token})
-    else
-      collections_url + "/download/#{@object.uuid}/#{token}/"
-    end
+    keep_web_url(@object.uuid, nil, {path_token: token})
   end
 
   def share
@@ -468,43 +410,4 @@ class CollectionsController < ApplicationController
 
     uri.to_s
   end
-
-  # Note: several controller and integration tests rely on stubbing
-  # file_enumerator to return fake file content.
-  def file_enumerator opts
-    FileStreamer.new opts
-  end
-
-  class FileStreamer
-    include ArvadosApiClientHelper
-    def initialize(opts={})
-      @opts = opts
-    end
-    def each
-      return unless @opts[:uuid] && @opts[:file]
-
-      env = Hash[ENV].dup
-
-      require 'uri'
-      u = URI.parse(arvados_api_client.arvados_v1_base)
-      env['ARVADOS_API_HOST'] = "#{u.host}:#{u.port}"
-      env['ARVADOS_API_TOKEN'] = @opts[:arvados_api_token]
-      env['ARVADOS_API_HOST_INSECURE'] = "true" if Rails.configuration.arvados_insecure_https
-
-      bytesleft = @opts[:maxbytes].andand.to_i || 2**16
-      io = IO.popen([env, 'arv-get', "#{@opts[:uuid]}/#{@opts[:file]}"], 'rb')
-      while bytesleft > 0 && (buf = io.read([bytesleft, 2**16].min)) != nil
-        # shrink the bytesleft count, if we were given a maximum byte
-        # count to read
-        if @opts.include? :maxbytes
-          bytesleft = bytesleft - buf.length
-        end
-        yield buf
-      end
-      io.close
-      # "If ios is opened by IO.popen, close sets $?."
-      # http://www.ruby-doc.org/core-2.1.3/IO.html#method-i-close
-      Rails.logger.warn("#{@opts[:uuid]}/#{@opts[:file]}: #{$?}") if $? != 0
-    end
-  end
 end
index 8cf6b93b518076aaa01123880377db948dca3abd..60043d9024c223558cabc9cfc51a1d2522e6e1f4 100644 (file)
@@ -16,7 +16,7 @@ class HealthcheckController < ApplicationController
   before_filter :check_auth_header
 
   def check_auth_header
-    mgmt_token = Rails.configuration.management_token
+    mgmt_token = Rails.configuration.ManagementToken
     auth_header = request.headers['Authorization']
 
     if !mgmt_token
index b4ede751183206e8259afd6df196f7a368c2bb7e..821b4bcdf27de248d1b3b29a81e0fb4e18bad04b 100644 (file)
@@ -74,7 +74,7 @@ var makeFilter = function() {
     $("#log-viewer-download-pane").show();
     var headers = {};
     if (log_size > log_maxbytes) {
-      headers['Range'] = 'bytes=0-' + log_maxbytes;
+      headers['Range'] = 'bytes=0-' + (log_maxbytes - 1);
     }
     var ajax_opts = { dataType: 'text', headers: headers };
     load_log();
index 8b9bf15b78b5ba776f83e919f39506f1c2dc8891..da20573abb859b96a5202983ab9e77e1703ac25f 100644 (file)
@@ -99,6 +99,9 @@ test:
   profiling_enabled: true
   secret_token: <%= rand(2**256).to_s(36) %>
   secret_key_base: <%= rand(2**256).to_s(36) %>
+  # This setting is to allow workbench start when running tests, it should be
+  # set to a correct value when testing relevant features.
+  keep_web_url: http://example.com/c=%{uuid_or_pdh}
 
   # When you run the Workbench's integration tests, it starts the API
   # server as a dependency.  These settings should match the API
@@ -243,8 +246,8 @@ common:
   shell_in_a_box_url: false
 
   # Format of preview links. If false, use keep_web_download_url
-  # instead, and disable inline preview. If both are false, use
-  # Workbench's built-in file download/preview mechanism.
+  # instead, and disable inline preview.
+  # If both are false, Workbench won't start, this is a mandatory configuration.
   #
   # Examples:
   # keep_web_url: https://%{uuid_or_pdh}.collections.uuid_prefix.arvadosapi.com
@@ -298,4 +301,4 @@ common:
 
   # Token to be included in all healthcheck requests. Disabled by default.
   # Workbench expects request header of the format "Authorization: Bearer xxx"
-  management_token: false
+  ManagementToken: false
index 75edb8789d7afb26a82b628293be36e29dad9c8e..85df228f4855f755da2bf13fca31b6f85c689a0d 100644 (file)
@@ -23,6 +23,10 @@ development:
   arvados_v1_base: https://arvados.local:3030/arvados/v1
   arvados_insecure_https: true
 
+  # You need to configure at least one of these:
+  keep_web_url: false
+  keep_web_download_url: false
+
 production:
   # At minimum, you need a nice long randomly generated secret_token here.
   secret_token: ~
@@ -31,3 +35,7 @@ production:
   arvados_login_base: https://arvados.local:3030/login
   arvados_v1_base: https://arvados.local:3030/arvados/v1
   arvados_insecure_https: false
+
+  # You need to configure at least one of these:
+  keep_web_url: false
+  keep_web_download_url: false
index e2185a896963c23f9d2cd2c2036c885e196ae62f..d8d4dff567a45e63e27e744ab8b0f9a0da36c82b 100644 (file)
@@ -46,14 +46,26 @@ EOS
       cfg.send "#{k}=", v
     end
   end
-  if !nils.empty?
+  if !nils.empty? and not ::Rails.groups.include?('assets')
     raise <<EOS
+#{::Rails.groups.include?('assets')}
 Refusing to start in #{::Rails.env.to_s} mode with missing configuration.
 
 The following configuration settings must be specified in
 config/application.yml:
 * #{nils.join "\n* "}
 
+EOS
+  end
+  # Refuse to start if keep-web isn't configured
+  if not (config.keep_web_url or config.keep_web_download_url) and not ::Rails.groups.include?('assets')
+    raise <<EOS
+Refusing to start in #{::Rails.env.to_s} mode with missing configuration.
+
+Keep-web service must be configured in config/application.yml:
+* keep_web_url
+* keep_web_download_url
+
 EOS
   end
 end
index 26d6fda85e53f87bac69c47ea3e2e78639f584d0..773a4f45714b515d664ec290ecb61e32bcca5695 100644 (file)
@@ -23,15 +23,6 @@ class CollectionsControllerTest < ActionController::TestCase
       end
   end
 
-  def stub_file_content
-    # For the duration of the current test case, stub file download
-    # content with a randomized (but recognizable) string. Return the
-    # string, the test case can use it in assertions.
-    txt = 'the quick brown fox ' + rand(2**32).to_s
-    @controller.stubs(:file_enumerator).returns([txt])
-    txt
-  end
-
   def collection_params(collection_name, file_name=nil)
     uuid = api_fixture('collections')[collection_name.to_s]['uuid']
     params = {uuid: uuid, id: uuid}
@@ -75,17 +66,14 @@ class CollectionsControllerTest < ActionController::TestCase
   end
 
   test "download a file with spaces in filename" do
+    setup_for_keep_web
     collection = api_fixture('collections')['w_a_z_file']
-    fakepipe = IO.popen(['echo', '-n', 'w a z'], 'rb')
-    IO.expects(:popen).with { |cmd, mode|
-      cmd.include? "#{collection['uuid']}/w a z"
-    }.returns(fakepipe)
     get :show_file, {
       uuid: collection['uuid'],
       file: 'w a z'
     }, session_for(:active)
-    assert_response :success
-    assert_equal 'w a z', response.body
+    assert_response :redirect
+    assert_match /w%20a%20z/, response.redirect_url
   end
 
   test "viewing a collection fetches related projects" do
@@ -137,20 +125,18 @@ class CollectionsControllerTest < ActionController::TestCase
     params[:reader_token] = api_fixture("api_client_authorizations",
                                         "active_all_collections", "api_token")
     get(:show_file_links, params)
-    assert_response :success
-    assert_equal([['.', 'foo', 3]], assigns(:object).files)
+    assert_response :redirect
     assert_no_session
   end
 
   test "fetching collection file with reader token" do
-    expected = stub_file_content
+    setup_for_keep_web
     params = collection_params(:foo_file, "foo")
     params[:reader_token] = api_fixture("api_client_authorizations",
                                         "active_all_collections", "api_token")
     get(:show_file, params)
-    assert_response :success
-    assert_equal(expected, @response.body,
-                 "failed to fetch a Collection file with a reader token")
+    assert_response :redirect
+    assert_match /foo/, response.redirect_url
     assert_no_session
   end
 
@@ -163,24 +149,23 @@ class CollectionsControllerTest < ActionController::TestCase
   end
 
   test "getting a file from Keep" do
+    setup_for_keep_web
     params = collection_params(:foo_file, 'foo')
     sess = session_for(:active)
-    expect_content = stub_file_content
     get(:show_file, params, sess)
-    assert_response :success
-    assert_equal(expect_content, @response.body,
-                 "failed to get a correct file from Keep")
+    assert_response :redirect
+    assert_match /foo/, response.redirect_url
   end
 
   test 'anonymous download' do
+    setup_for_keep_web
     config_anonymous true
-    expect_content = stub_file_content
     get :show_file, {
       uuid: api_fixture('collections')['user_agreement_in_anonymously_accessible_project']['uuid'],
       file: 'GNU_General_Public_License,_version_3.pdf',
     }
-    assert_response :success
-    assert_equal expect_content, response.body
+    assert_response :redirect
+    assert_match /GNU_General_Public_License/, response.redirect_url
   end
 
   test "can't get a file from Keep without permission" do
@@ -190,22 +175,14 @@ class CollectionsControllerTest < ActionController::TestCase
     assert_response 404
   end
 
-  test "trying to get a nonexistent file from Keep returns a 404" do
-    params = collection_params(:foo_file, 'gone')
-    sess = session_for(:admin)
-    get(:show_file, params, sess)
-    assert_response 404
-  end
-
   test "getting a file from Keep with a good reader token" do
+    setup_for_keep_web
     params = collection_params(:foo_file, 'foo')
     read_token = api_fixture('api_client_authorizations')['active']['api_token']
     params[:reader_token] = read_token
-    expect_content = stub_file_content
     get(:show_file, params)
-    assert_response :success
-    assert_equal(expect_content, @response.body,
-                 "failed to get a correct file from Keep using a reader token")
+    assert_response :redirect
+    assert_match /foo/, response.redirect_url
     assert_not_equal(read_token, session[:arvados_api_token],
                      "using a reader token set the session's API token")
   end
@@ -229,25 +206,22 @@ class CollectionsControllerTest < ActionController::TestCase
   end
 
   test "can get a file with an unpermissioned auth but in-scope reader token" do
+    setup_for_keep_web
     params = collection_params(:foo_file, 'foo')
     sess = session_for(:expired)
     read_token = api_fixture('api_client_authorizations')['active']['api_token']
     params[:reader_token] = read_token
-    expect_content = stub_file_content
     get(:show_file, params, sess)
-    assert_response :success
-    assert_equal(expect_content, @response.body,
-                 "failed to get a correct file from Keep using a reader token")
+    assert_response :redirect
     assert_not_equal(read_token, session[:arvados_api_token],
                      "using a reader token set the session's API token")
   end
 
   test "inactive user can retrieve user agreement" do
+    setup_for_keep_web
     ua_collection = api_fixture('collections')['user_agreement']
     # Here we don't test whether the agreement can be retrieved from
-    # Keep. We only test that show_file decides to send file content,
-    # so we use the file content stub.
-    stub_file_content
+    # Keep. We only test that show_file decides to send file content.
     get :show_file, {
       uuid: ua_collection['uuid'],
       file: ua_collection['manifest_text'].match(/ \d+:\d+:(\S+)/)[1]
@@ -255,7 +229,7 @@ class CollectionsControllerTest < ActionController::TestCase
     assert_nil(assigns(:unsigned_user_agreements),
                "Did not skip check_user_agreements filter " +
                "when showing the user agreement.")
-    assert_response :success
+    assert_response :redirect
   end
 
   test "requesting nonexistent Collection returns 404" do
@@ -263,37 +237,12 @@ class CollectionsControllerTest < ActionController::TestCase
                     :active, 404)
   end
 
-  test "use a reasonable read buffer even if client requests a huge range" do
-    fakefiledata = mock
-    IO.expects(:popen).returns(fakefiledata)
-    fakefiledata.expects(:read).twice.with() do |length|
-      # Fail the test if read() is called with length>1MiB:
-      length < 2**20
-      ## Force the ActionController::Live thread to lose the race to
-      ## verify that @response.body.length actually waits for the
-      ## response (see below):
-      # sleep 3
-    end.returns("foo\n", nil)
-    fakefiledata.expects(:close)
-    foo_file = api_fixture('collections')['foo_file']
-    @request.headers['Range'] = 'bytes=0-4294967296/*'
-    get :show_file, {
-      uuid: foo_file['uuid'],
-      file: foo_file['manifest_text'].match(/ \d+:\d+:(\S+)/)[1]
-    }, session_for(:active)
-    # Wait for the whole response to arrive before deciding whether
-    # mocks' expectations were met. Otherwise, Mocha will fail the
-    # test depending on how slowly the ActionController::Live thread
-    # runs.
-    @response.body.length
-  end
-
   test "show file in a subdirectory of a collection" do
+    setup_for_keep_web
     params = collection_params(:collection_with_files_in_subdir, 'subdir2/subdir3/subdir4/file1_in_subdir4.txt')
-    expect_content = stub_file_content
     get(:show_file, params, session_for(:user1_with_load))
-    assert_response :success
-    assert_equal(expect_content, @response.body, "failed to get a correct file from Keep")
+    assert_response :redirect
+    assert_match /subdir2\/subdir3\/subdir4\/file1_in_subdir4\.txt/, response.redirect_url
   end
 
   test 'provenance graph' do
@@ -521,7 +470,6 @@ class CollectionsControllerTest < ActionController::TestCase
   def setup_for_keep_web cfg='https://%{uuid_or_pdh}.example', dl_cfg=false
     Rails.configuration.keep_web_url = cfg
     Rails.configuration.keep_web_download_url = dl_cfg
-    @controller.expects(:file_enumerator).never
   end
 
   %w(uuid portable_data_hash).each do |id_type|
index 9254593dc3354914b391740747440dc07f7eb221..9a63a29e8f9677ec8a53426b374e42d255996c56 100644 (file)
@@ -13,7 +13,7 @@ class HealthcheckControllerTest < ActionController::TestCase
     [true, 'Bearer configuredmanagementtoken', 200, '{"health":"OK"}'],
   ].each do |enabled, header, error_code, error_msg|
     test "ping when #{if enabled then 'enabled' else 'disabled' end} with header '#{header}'" do
-      Rails.configuration.management_token = 'configuredmanagementtoken' if enabled
+      Rails.configuration.ManagementToken = 'configuredmanagementtoken' if enabled
 
       @request.headers['Authorization'] = header
       get :ping
index 6141cb3ca1e87989bf7713120f8969f2d75c64a7..6971c39f3385f59674a681e7ccb38cf1f0d9f2b4 100644 (file)
@@ -5,6 +5,8 @@
 require 'integration_helper'
 
 class AnonymousAccessTest < ActionDispatch::IntegrationTest
+  include KeepWebConfig
+
   # These tests don't do state-changing API calls. Save some time by
   # skipping the database reset.
   reset_api_fixtures :after_each_test, false
@@ -117,10 +119,12 @@ class AnonymousAccessTest < ActionDispatch::IntegrationTest
   end
 
   test 'view file' do
+    use_keep_web_config
+
     magic = rand(2**512).to_s 36
-    CollectionsController.any_instance.stubs(:file_enumerator).returns([magic])
-    collection = api_fixture('collections')['public_text_file']
-    visit '/collections/' + collection['uuid']
+    owner = api_fixture('groups')['anonymously_accessible_project']['uuid']
+    col = upload_data_and_get_collection(magic, 'admin', "Hello\\040world.txt", owner)
+    visit '/collections/' + col.uuid
     find('tr,li', text: 'Hello world.txt').
       find('a[title~=View]').click
     assert_text magic
index 8619858dfeee90889cc411b2e06677caa2291134..71cfe38abfda32b2d5b5ce943ecdbf26f46ff52b 100644 (file)
@@ -6,6 +6,8 @@ require 'integration_helper'
 require_relative 'integration_test_utils'
 
 class CollectionsTest < ActionDispatch::IntegrationTest
+  include KeepWebConfig
+
   setup do
     need_javascript
   end
@@ -44,7 +46,7 @@ class CollectionsTest < ActionDispatch::IntegrationTest
   test "creating and uncreating a sharing link" do
     coll_uuid = api_fixture("collections", "collection_owned_by_active", "uuid")
     download_link_re =
-      Regexp.new(Regexp.escape("/collections/download/#{coll_uuid}/"))
+      Regexp.new(Regexp.escape("/c=#{coll_uuid}/"))
     visit page_with_token("active_trustedclient", "/collections/#{coll_uuid}")
     within "#sharing-button" do
       check_sharing(:on, download_link_re)
@@ -53,10 +55,20 @@ class CollectionsTest < ActionDispatch::IntegrationTest
   end
 
   test "can download an entire collection with a reader token" do
-    Capybara.current_driver = :rack_test
-    CollectionsController.any_instance.
-      stubs(:file_enumerator).returns(["foo\n", "file\n"])
-    uuid = api_fixture('collections')['foo_file']['uuid']
+    use_keep_web_config
+
+    token = api_fixture('api_client_authorizations')['active']['api_token']
+    data = "foo\nfile\n"
+    datablock = `echo -n #{data.shellescape} | ARVADOS_API_TOKEN=#{token.shellescape} arv-put --no-progress --raw -`.strip
+    assert $?.success?, $?
+
+    col = nil
+    use_token 'active' do
+      mtxt = ". #{datablock} 0:#{data.length}:foo\n"
+      col = Collection.create(manifest_text: mtxt)
+    end
+
+    uuid = col.uuid
     token = api_fixture('api_client_authorizations')['active_all_collections']['api_token']
     url_head = "/collections/download/#{uuid}/#{token}/"
     visit url_head
@@ -78,10 +90,8 @@ class CollectionsTest < ActionDispatch::IntegrationTest
     end
     assert_equal(['foo'], hrefs.compact.sort,
                  "download page did provide strictly file links")
-    within "#collection_files" do
-      click_link "foo"
-      assert_equal("foo\nfile\n", page.html)
-    end
+    click_link "foo"
+    assert_text "foo\nfile\n"
   end
 
   test "combine selected collections into new collection" do
index 8a60a84459243d01d31f4b564206464738f2a750..bfed03b14bd223085dd7fa704eb8494c951ec198 100644 (file)
@@ -39,37 +39,25 @@ class JobsTest < ActionDispatch::IntegrationTest
     assert_selector 'a[href="/"]', text: 'Go to dashboard'
   end
 
-  test "view job log" do
-    job = api_fixture('jobs')['job_with_real_log']
-
-    IO.expects(:popen).returns(fakepipe_with_log_data)
-
-    visit page_with_token("active", "/jobs/#{job['uuid']}")
-    assert page.has_text? job['script_version']
-
-    find(:xpath, "//a[@href='#Log']").click
-    wait_for_ajax
-    assert page.has_text? 'Started at'
-    assert page.has_text? 'Finished at'
-    assert page.has_text? 'log message 1'
-    assert page.has_text? 'log message 2'
-    assert page.has_text? 'log message 3'
-    assert page.has_no_text? 'Showing only 100 bytes of this log'
-  end
-
   test 'view partial job log' do
+    need_selenium 'to be able to see the CORS response headers (PhantomJS 1.9.8 does not)'
+    use_keep_web_config
+
     # This config will be restored during teardown by ../test_helper.rb:
     Rails.configuration.log_viewer_max_bytes = 100
 
-    IO.expects(:popen).returns(fakepipe_with_log_data)
-    job = api_fixture('jobs')['job_with_real_log']
-
-    visit page_with_token("active", "/jobs/#{job['uuid']}")
-    assert page.has_text? job['script_version']
-
-    find(:xpath, "//a[@href='#Log']").click
+    logdata = fakepipe_with_log_data.read
+    job_uuid = api_fixture('jobs')['running']['uuid']
+    logcollection = upload_data_and_get_collection(logdata, 'active', "#{job_uuid}.log.txt")
+    job = nil
+    use_token 'active' do
+      job = Job.find job_uuid
+      job.update_attributes log: logcollection.portable_data_hash
+    end
+    visit page_with_token 'active', '/jobs/'+job.uuid
+    find('a[href="#Log"]').click
     wait_for_ajax
-    assert page.has_text? 'Showing only 100 bytes of this log'
+    assert_text 'Showing only 100 bytes of this log'
   end
 
   test 'view log via keep-web redirect' do
index 5d2cefe447c4fb84e29eae4dc93bea6fd3748d90..ef2779cc3e78eedb556ce2dc7114a6e2466112b0 100644 (file)
@@ -163,7 +163,6 @@ module KeepWebConfig
     @kwdport = getport 'keep-web-dl-ssl'
     Rails.configuration.keep_web_url = "https://localhost:#{@kwport}/c=%{uuid_or_pdh}"
     Rails.configuration.keep_web_download_url = "https://localhost:#{@kwdport}/c=%{uuid_or_pdh}"
-    CollectionsController.any_instance.expects(:file_enumerator).never
   end
 end
 
@@ -241,3 +240,19 @@ class ActionDispatch::IntegrationTest
     end
   end
 end
+
+def upload_data_and_get_collection(data, user, filename, owner_uuid=nil)
+  token = api_fixture('api_client_authorizations')[user]['api_token']
+  datablock = `echo -n #{data.shellescape} | ARVADOS_API_TOKEN=#{token.shellescape} arv-put --no-progress --raw -`.strip
+  assert $?.success?, $?
+  col = nil
+  use_token user do
+    mtxt = ". #{datablock} 0:#{data.length}:#{filename}\n"
+    if owner_uuid
+      col = Collection.create(manifest_text: mtxt, owner_uuid: owner_uuid)
+    else
+      col = Collection.create(manifest_text: mtxt)
+    end
+  end
+  return col
+end
index b71ac890b209908608d2eb59d35fb37bd9f85e43..841638048205d20de4a8d05f638e74a0dc8dce8c 100644 (file)
@@ -32,7 +32,7 @@ debian8,debian9,ubuntu1204,ubuntu1404,ubuntu1604,centos7|docker-py|1.7.2|2|pytho
 debian8,debian9,ubuntu1204,centos7|six|1.10.0|2|python3|all
 debian8,debian9,ubuntu1204,ubuntu1404,centos7|requests|2.12.4|2|python3|all
 debian8,debian9,ubuntu1204,ubuntu1404,ubuntu1604,centos7|websocket-client|0.37.0|2|python3|all
-ubuntu1204|requests|2.12.4|2|python|all
+ubuntu1204,ubuntu1404|requests|2.4.3|2|python|all
 ubuntu1204,centos7|contextlib2|0.5.4|2|python|all
 ubuntu1204,centos7|isodate|0.5.4|2|python|all
 centos7|daemon|2.1.1|2|python|all
@@ -41,7 +41,8 @@ centos7|pyparsing|2.1.10|2|python|all
 centos7|keepalive|0.5|2|python|all
 debian8,debian9,ubuntu1204,ubuntu1404,ubuntu1604,centos7|lockfile|0.12.2|2|python|all|--epoch 1
 all|ruamel.yaml|0.13.7|2|python|amd64|--python-setup-py-arguments --single-version-externally-managed
-all|cwltest|1.0.20160907111242|3|python|all|--depends 'python-futures >= 3.0.5'
+all|cwltest|1.0.20170809112706|3|python|all|--depends 'python-futures >= 3.0.5'
+all|junit-xml|1.7|3|python|all
 all|rdflib-jsonld|0.4.0|2|python|all
 all|futures|3.0.5|2|python|all
 all|future|0.16.0|2|python|all
index 9f4f8125debb50836bd44604aa98529dfc6932e1..fd7b38e8b64e66e3c18275578890a84c3ccdd2a4 100755 (executable)
@@ -128,6 +128,8 @@ timer_reset
 # clean up the docker build environment
 cd "$WORKSPACE"
 
+title "Starting arvbox build localdemo"
+
 tools/arvbox/bin/arvbox build localdemo
 ECODE=$?
 
@@ -136,6 +138,8 @@ if [[ "$ECODE" != "0" ]]; then
     EXITCODE=$(($EXITCODE + $ECODE))
 fi
 
+title "Starting arvbox build dev"
+
 tools/arvbox/bin/arvbox build dev
 
 ECODE=$?
index 39c40934311c9786ba0ed7786818807d77086ab6..2958d3323d9972f7cf826ef1fb0cb69c05d448eb 100755 (executable)
@@ -340,6 +340,7 @@ fi
 # Go binaries
 cd $WORKSPACE/packages/$TARGET
 export GOPATH=$(mktemp -d)
+go get -v github.com/kardianos/govendor
 package_go_binary sdk/go/crunchrunner crunchrunner \
     "Crunchrunner executes a command inside a container and uploads the output"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
@@ -619,6 +620,7 @@ if [[ "$?" == "0" ]] ; then
       \cp config/application.yml.example config/application.yml -f
       \cp config/environments/production.rb.example config/environments/production.rb -f
       sed -i 's/secret_token: ~/secret_token: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/' config/application.yml
+      sed -i 's/keep_web_url: false/keep_web_url: exampledotcom/' config/application.yml
 
       RAILS_ENV=production RAILS_GROUPS=assets bundle exec rake assets:precompile >/dev/null
 
index ae5ad6d49bff34557c68eacebb8976d97ab4b2db..cf7755b68de780631cee4319ea720160146ffdff 100755 (executable)
@@ -103,19 +103,27 @@ package_go_binary() {
 
     mkdir -p "$GOPATH/src/git.curoverse.com"
     ln -sfn "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+    (cd "$GOPATH/src/git.curoverse.com/arvados.git" && "$GOPATH/bin/govendor" sync -v)
 
     cd "$GOPATH/src/git.curoverse.com/arvados.git/$src_path"
     local version="$(version_from_git)"
     local timestamp="$(timestamp_from_git)"
 
-    # If the command imports anything from the Arvados SDK, bump the
-    # version number and build a new package whenever the SDK changes.
+    # Update the version number and build a new package if the vendor
+    # bundle has changed, or the command imports anything from the
+    # Arvados SDK and the SDK has changed.
+    declare -a checkdirs=(vendor)
     if grep -qr git.curoverse.com/arvados .; then
-        cd "$GOPATH/src/git.curoverse.com/arvados.git/sdk/go"
-        if [[ $(timestamp_from_git) -gt "$timestamp" ]]; then
+        checkdirs+=(sdk/go)
+    fi
+    for dir in ${checkdirs[@]}; do
+        cd "$GOPATH/src/git.curoverse.com/arvados.git/$dir"
+        ts="$(timestamp_from_git)"
+        if [[ "$ts" -gt "$timestamp" ]]; then
             version=$(version_from_git)
+            timestamp="$ts"
         fi
-    fi
+    done
 
     cd $WORKSPACE/packages/$TARGET
     test_package_presence $prog $version go
index 3952b36604102cf6f3fd78fceb95eac915d8c2d4..20780811a58e5ecd59ce9c4b399f3b914c462480 100755 (executable)
@@ -81,7 +81,7 @@ services/keepstore
 services/keep-balance
 services/login-sync
 services/nodemanager
-services/nodemanager-integration
+services/nodemanager_integration
 services/crunch-run
 services/crunch-dispatch-local
 services/crunch-dispatch-slurm
@@ -545,6 +545,9 @@ do_test() {
         apps/workbench_units | apps/workbench_functionals | apps/workbench_integration)
             suite=apps/workbench
             ;;
+        services/nodemanager | services/nodemanager_integration)
+            suite=services/nodemanager_suite
+            ;;
         *)
             suite="${1}"
             ;;
@@ -860,11 +863,11 @@ test_login-sync() {
 }
 do_test services/login-sync login-sync
 
-test_nodemanager-integration() {
+test_nodemanager_integration() {
     cd "$WORKSPACE/services/nodemanager" \
-        && tests/integration_test.py ${testargs[services/nodemanager-integration]}
+        && tests/integration_test.py ${testargs[services/nodemanager_integration]}
 }
-do_test services/nodemanager-integration nodemanager-integration
+do_test services/nodemanager_integration nodemanager_integration
 
 for p in "${pythonstuff[@]}"
 do
index f4915c0e3e9f8e34a61ba20d488bd8edd3428190..056ef0d185e61c7bbc52b692abd21ea61d9afdd4 100644 (file)
@@ -93,7 +93,7 @@ func (r *Reporter) Stop() {
 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
        content, err := ioutil.ReadAll(in)
        if err != nil {
-               r.Logger.Print(err)
+               r.Logger.Printf("warning: %v", err)
        }
        return content, err
 }
@@ -169,7 +169,7 @@ func (r *Reporter) getContainerNetStats() (io.Reader, error) {
                statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
                stats, err := ioutil.ReadFile(statsFilename)
                if err != nil {
-                       r.Logger.Print(err)
+                       r.Logger.Printf("notice: %v", err)
                        continue
                }
                return strings.NewReader(string(stats)), nil
@@ -416,7 +416,7 @@ func (r *Reporter) waitForCIDFile() bool {
                select {
                case <-ticker.C:
                case <-r.done:
-                       r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
+                       r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
                        return false
                }
        }
@@ -439,9 +439,9 @@ func (r *Reporter) waitForCgroup() bool {
                select {
                case <-ticker.C:
                case <-warningTimer:
-                       r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
+                       r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
                case <-r.done:
-                       r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
+                       r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
                        return false
                }
        }
index 962a579ec172b237e4668ca8ea5c1acbc069b8f5..c27e39241df08af2c925a791e6fd849afc496b90 100644 (file)
@@ -44,7 +44,7 @@ func TestReadAllOrWarnFail(t *testing.T) {
        <-done
        if err != nil {
                t.Fatal(err)
-       } else if matched, err := regexp.MatchString("^read /proc/self/mem: .*", string(msg)); err != nil || !matched {
+       } else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
                t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
        }
 }
index 5a92176e7f02fba11525190ccee511339819e1d2..fd598c9dd37bbfdd88aa12e2e84d5d73c6929bf9 100755 (executable)
@@ -1022,7 +1022,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
   delete $Jobstep->{tempfail};
 
   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
-  $Jobstep->{'arvados_task'}->save;
+  retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
 
   splice @jobstep_todo, $todo_ptr, 1;
   --$todo_ptr;
@@ -1205,7 +1205,7 @@ sub reapchildren
             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
             exit_status_s($childstatus)));
       $Jobstep->{'arvados_task'}->{success} = 0;
-      $Jobstep->{'arvados_task'}->save;
+      retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
       $task_success = 0;
     }
 
@@ -1258,7 +1258,7 @@ sub reapchildren
     $Jobstep->{exitcode} = $childstatus;
     $Jobstep->{finishtime} = time;
     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
-    $Jobstep->{'arvados_task'}->save;
+    retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
     process_stderr_final ($jobstepidx);
     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
                               length($Jobstep->{'arvados_task'}->{output}),
@@ -1544,7 +1544,7 @@ sub preprocess_stderr
         $st->{node}->{fail_count}++;
       }
     }
-    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
+    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
       $jobstep[$jobstepidx]->{tempfail} = 1;
       if (defined($job_slot_index)) {
         $slot[$job_slot_index]->{node}->{fail_count}++;
@@ -2177,8 +2177,22 @@ sub retry_op {
   # that can be retried, the second function will be called with
   # the current try count (0-based), next try time, and error message.
   my $operation = shift;
-  my $retry_callback = shift;
+  my $op_text = shift;
   my $retries = retry_count();
+  my $retry_callback = sub {
+    my ($try_count, $next_try_at, $errmsg) = @_;
+    $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
+    $errmsg =~ s/\s/ /g;
+    $errmsg =~ s/\s+$//;
+    my $retry_msg;
+    if ($next_try_at < time) {
+      $retry_msg = "Retrying.";
+    } else {
+      my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
+      $retry_msg = "Retrying at $next_try_fmt.";
+    }
+    Log(undef, "$op_text failed: $errmsg. $retry_msg");
+  };
   foreach my $try_count (0..$retries) {
     my $next_try = time + (2 ** $try_count);
     my $result = eval { $operation->(@_); };
@@ -2201,25 +2215,11 @@ sub api_call {
   # This function will call that method, retrying as needed until
   # the current retry_count is exhausted, with a log on the first failure.
   my $method_name = shift;
-  my $log_api_retry = sub {
-    my ($try_count, $next_try_at, $errmsg) = @_;
-    $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
-    $errmsg =~ s/\s/ /g;
-    $errmsg =~ s/\s+$//;
-    my $retry_msg;
-    if ($next_try_at < time) {
-      $retry_msg = "Retrying.";
-    } else {
-      my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
-      $retry_msg = "Retrying at $next_try_fmt.";
-    }
-    Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
-  };
   my $method = $arv;
   foreach my $key (split(/\//, $method_name)) {
     $method = $method->{$key};
   }
-  return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
+  return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
 }
 
 sub exit_status_s {
index 695597f839c1ceee5ccbb67f3391218d3d1a8e34..7f4b5c7549314b0d0dbd3cfbf52b1023ad7887fd 100644 (file)
@@ -113,6 +113,7 @@ class ArvCwlRunner(object):
                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
                                                 num_retries=self.num_retries,
                                                 overrides=kwargs.get("override_tools"))
+        kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
index 4ab65d9d8774708613787b3a694f64bf876004da..769a63bce3f56763e7fa1767317d5af9828a03d0 100644 (file)
@@ -363,6 +363,9 @@ class RunnerContainer(Runner):
         if self.arvrunner.trash_intermediate:
             command.append("--trash-intermediate")
 
+        if self.arvrunner.project_uuid:
+            command.append("--project-uuid="+self.arvrunner.project_uuid)
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
index f42e6d8c9c356cfcee7c165d554359cdec40623b..6b736a5a7d872ff60eae3bdeffc1e55c66de40c0 100644 (file)
@@ -36,7 +36,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
     with SourceLine(dockerRequirement, "dockerImageId", WorkflowException):
         sp = dockerRequirement["dockerImageId"].split(":")
         image_name = sp[0]
-        image_tag = sp[1] if len(sp) > 1 else None
+        image_tag = sp[1] if len(sp) > 1 else "latest"
 
         images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
                                                                 image_name=image_name,
@@ -51,9 +51,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
             if project_uuid:
                 args.append("--project-uuid="+project_uuid)
             args.append(image_name)
-            if image_tag:
-                args.append(image_tag)
-            logger.info("Uploading Docker image %s", ":".join(args[1:]))
+            args.append(image_tag)
+            logger.info("Uploading Docker image %s:%s", image_name, image_tag)
             try:
                 arvados.commands.keepdocker.main(args, stdout=sys.stderr)
             except SystemExit as e:
index 35a068f91be7b28b7eebd0f32538e9222367d8a1..b667dac1ca5cec6f272c390be8fcd17e1628764c 100644 (file)
@@ -15,7 +15,7 @@ class ArvadosCommandTool(CommandLineTool):
         self.arvrunner = arvrunner
         self.work_api = kwargs["work_api"]
 
-    def makeJobRunner(self, use_container=True):
+    def makeJobRunner(self, **kwargs):
         if self.work_api == "containers":
             return ArvadosContainer(self.arvrunner)
         elif self.work_api == "jobs":
index 93e2819084601d784a973c33ca30bbbdb2d6db49..08e203b87908aa13d702ee983b1c39617a9ca8a2 100644 (file)
@@ -227,6 +227,9 @@ workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
 pipeline_template_uuid_pattern = re.compile(r'[a-z0-9]{5}-p5p6p-[a-z0-9]{15}')
 
 def collectionResolver(api_client, document_loader, uri, num_retries=4):
+    if uri.startswith("keep:") or uri.startswith("arvwf:"):
+        return uri
+
     if workflow_uuid_pattern.match(uri):
         return "arvwf:%s#main" % (uri)
 
index e39c7d23ce22fe71e94d7956e2f5abe4a8e323e3..5a2d814f5d0f614caefea7b1a08c9c36f6fc8925 100644 (file)
@@ -205,10 +205,20 @@ class ArvPathMapper(PathMapper):
 class StagingPathMapper(PathMapper):
     _follow_dirs = True
 
+    def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
+        self.targets = set()
+        super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
+
     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
         loc = obj["location"]
         tgt = os.path.join(stagedir, obj["basename"])
+        basetgt, baseext = os.path.splitext(tgt)
+        n = 1
+        while tgt in self.targets:
+            n += 1
+            tgt = "%s_%i%s" % (basetgt, n, baseext)
+        self.targets.add(tgt)
         if obj["class"] == "Directory":
             self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
             if loc.startswith("_:") or self._follow_dirs:
index 087fed3e16e72cb26c95500b4ccb03a83bf71806..bb4fac2ae9541a55872ff6fd371b380659a0f15e 100644 (file)
@@ -161,6 +161,8 @@ def upload_docker(arvrunner, tool):
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+        else:
+            arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
index db11705f4ed4b99d7bfd03dec59a2d6b3de20e3d..50f9cf4220d8064b98099c83b93462592d11ba5e 100644 (file)
@@ -51,8 +51,8 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20170707200431',
-          'schema-salad==2.6.20170630075932',
+          'cwltool==1.0.20170828135420',
+          'schema-salad==2.6.20170712194300',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',
           'arvados-python-client>=0.1.20170526013812',
index 3d6b91536a2f2732ad2e97d5c172bf41e2ee60e7..49545a83dc7ac34eea9acc11dc3e022f2839f22c 100644 (file)
@@ -84,7 +84,13 @@ def stubs(func):
                 "uuid": "",
                 "portable_data_hash": "99999999999999999999999999999998+99",
                 "manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
-            }}
+            },
+            "99999999999999999999999999999994+99": {
+                "uuid": "",
+                "portable_data_hash": "99999999999999999999999999999994+99",
+                "manifest_text": ". 99999999999999999999999999999994+99 0:0:expect_arvworkflow.cwl"
+            }
+        }
         stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
         stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
 
@@ -152,7 +158,8 @@ def stubs(func):
                             'class': 'File',
                             'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
                             "nameext": ".txt",
-                            "nameroot": "blorp"
+                            "nameroot": "blorp",
+                            "size": 16
                         }},
                         'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
                               'listing': [
@@ -217,7 +224,8 @@ def stubs(func):
                             'class': 'File',
                             'location': u'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
                             "nameext": ".txt",
-                            "nameroot": "blorp"
+                            "nameroot": "blorp",
+                            "size": 16
                         },
                         'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
                             {'basename': 'renamed.txt',
@@ -852,6 +860,31 @@ class TestSubmit(unittest.TestCase):
                          stubs.expect_container_request_uuid + '\n')
 
 
+    @stubs
+    def test_submit_container_project(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--project-uuid="+project_uuid,
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["owner_uuid"] = project_uuid
+        expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                                       '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid,
+                                       '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+
     @stubs
     def test_submit_job_runner_image(self, stubs):
         capture_stdout = cStringIO.StringIO()
index 32a9255e90d442a475394036ea54dd55a797e6fa..f45077197fef194662c206a72045b2a26ddaae24 100644 (file)
@@ -24,7 +24,7 @@ $graph:
   - id: '#main/x'
     type: File
     default: {class: File, location: 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
-      basename: blorp.txt, nameroot: blorp, nameext: .txt}
+      size: 16, basename: blorp.txt, nameroot: blorp, nameext: .txt}
   - id: '#main/y'
     type: Directory
     default: {class: Directory, location: 'keep:99999999999999999999999999999998+99',
index 191f80cbcb11ff9ac44c8e08f37df13e375e4243..2a9cea5802e3437df58dbea37da8cde9f718ec88 100644 (file)
@@ -6,6 +6,7 @@
   "$graph": [
     {
       "class": "Workflow",
+      "cwlVersion": "v1.0",
       "hints": [],
       "id": "#main",
       "inputs": [
index 7d39d678f8ec02c52f6446461c4de9f8e95c142a..7e588be17bb16c04cdbd6098b8dbff8f7c599d18 100644 (file)
@@ -31,7 +31,7 @@ type Mount struct {
        Path              string      `json:"path"`
        Content           interface{} `json:"content"`
        ExcludeFromOutput bool        `json:"exclude_from_output"`
-       Capacity          int64       `json:capacity`
+       Capacity          int64       `json:"capacity"`
 }
 
 // RuntimeConstraints specify a container's compute resources (RAM,
index 29eebdbf729d557a88e121b582cdd78171e31bdd..773a2e6f9c7d787406511f85a6a5585596153738 100644 (file)
@@ -21,7 +21,7 @@ type TransactionError struct {
 }
 
 func (e TransactionError) Error() (s string) {
-       s = fmt.Sprintf("request failed: %s", e.URL)
+       s = fmt.Sprintf("request failed: %s", e.URL.String())
        if e.Status != "" {
                s = s + ": " + e.Status
        }
index f19511ea70791dbd17a201825bfaadc00515082c..81b9587c36aca32d9495307af2ede17112b525aa 100644 (file)
@@ -44,7 +44,7 @@ type Handler struct {
        Routes Routes
 
        // If non-nil, Log is called after handling each request. The
-       // error argument is nil if the request was succesfully
+       // error argument is nil if the request was successfully
        // authenticated and served, even if the health check itself
        // failed.
        Log func(*http.Request, error)
index 03d542805161481f4023c7c9084a2e0cce3c7b16..4523f7d6b3ac38561e17c50b889201166f22baad 100644 (file)
@@ -46,9 +46,12 @@ sub process_request
     $self->{'req'} = new HTTP::Request (%req);
     $self->{'req'}->header('Authorization' => ('OAuth2 ' . $self->{'authToken'})) if $self->{'authToken'};
     $self->{'req'}->header('Accept' => 'application/json');
+
+    # allow_nonref lets us encode JSON::true and JSON::false, see #12078
+    my $json = JSON->new->allow_nonref;
     my ($p, $v);
     while (($p, $v) = each %{$self->{'queryParams'}}) {
-        $content{$p} = (ref($v) eq "") ? $v : JSON::encode_json($v);
+        $content{$p} = (ref($v) eq "") ? $v : $json->encode($v);
     }
     my $content;
     while (($p, $v) = each %content) {
index afd9bdcd89d07b2b4447b059a33b49ccbe483a6c..ec4ae8fb6f971a8d19307a58311b2dec8eed70bc 100644 (file)
@@ -710,6 +710,7 @@ class ArvPutUploadJob(object):
             elif file_in_local_collection.permission_expired():
                 # Permission token expired, re-upload file. This will change whenever
                 # we have a API for refreshing tokens.
+                self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
                 should_upload = True
                 self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
index d338ee3afa7f19d3f18f56981ca9a19caec59e08..37dab55d60351b69bf97980f1dd9fa1376e4303b 100644 (file)
@@ -50,9 +50,10 @@ def main(arguments=None):
             if "job" in components[c]:
                 pipeline_jobs.add(components[c]["job"]["uuid"])
         if known_component_jobs != pipeline_jobs:
+            new_filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
+            ws.subscribe(new_filters)
             ws.unsubscribe(filters)
-            filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
-            ws.subscribe([['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]])
+            filters = new_filters
             known_component_jobs = pipeline_jobs
 
     api = arvados.api('v1')
@@ -88,7 +89,8 @@ def main(arguments=None):
                 sys.stdout.write(ev["properties"]["text"])
             elif ev["event_type"] in ("create", "update"):
                 if ev["object_kind"] == "arvados#pipelineInstance":
-                    update_subscribed_components(ev["properties"]["new_attributes"]["components"])
+                    c = api.pipeline_instances().get(uuid=ev["object_uuid"]).execute()
+                    update_subscribed_components(c["components"])
 
                 if ev["object_kind"] == "arvados#pipelineInstance" and args.pipeline:
                     if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Paused"):
index 97e1d26d2ba2e1cf7ad503ab80b2974676c87cf8..1a973586051769e816103553e22326839a0c3670 100644 (file)
@@ -24,6 +24,8 @@ collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
+job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
+container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
 
 def clear_tmpdir(path=None):
index ce6618132d011056c13580c9497abf4072cc7dc3..346167846cd5ed185453ae85553a1676718e53d6 100644 (file)
@@ -9,21 +9,23 @@ standard_library.install_aliases()
 from builtins import str
 from builtins import range
 import apiclient
+import datetime
+import hashlib
+import json
 import mock
 import os
 import pwd
+import random
 import re
 import shutil
 import subprocess
 import sys
 import tempfile
+import threading
 import time
 import unittest
-import yaml
-import threading
-import hashlib
-import random
 import uuid
+import yaml
 
 import arvados
 import arvados.commands.put as arv_put
@@ -727,6 +729,9 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         cls.ENVIRON = os.environ.copy()
         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
 
+    def datetime_to_hex(self, dt):
+        return hex(int(time.mktime(dt.timetuple())))[2:]
+
     def setUp(self):
         super(ArvPutIntegrationTest, self).setUp()
         arv_put.api_client = None
@@ -840,6 +845,49 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         self.assertEqual(1, len(collection_list))
         return collection_list[0]
 
+    def test_expired_token_invalidates_cache(self):
+        self.authorize_with('active')
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+            f.write('foo')
+        # Upload a directory and get the cache file name
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+        self.assertEqual(p.returncode, 0)
+        cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+                                   err.decode()).groups()[0]
+        self.assertTrue(os.path.isfile(cache_filepath))
+        # Load the cache file contents and modify the manifest to simulate
+        # an expired access token
+        with open(cache_filepath, 'r') as c:
+            cache = json.load(c)
+        self.assertRegex(cache['manifest'], r'\+A\S+\@')
+        a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+        cache['manifest'] = re.sub(
+            r'\@.*? ',
+            "@{} ".format(self.datetime_to_hex(a_month_ago)),
+            cache['manifest'])
+        with open(cache_filepath, 'w') as c:
+            c.write(json.dumps(cache))
+        # Re-run the upload and expect to get an invalid cache message
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(
+            err.decode(),
+            r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
+        self.assertEqual(p.returncode, 0)
+        # Confirm that the resulting cache is different from the last run.
+        with open(cache_filepath, 'r') as c2:
+            new_cache = json.load(c2)
+        self.assertNotEqual(cache['manifest'], new_cache['manifest'])
+
     def test_put_collection_with_later_update(self):
         tmpdir = self.make_tmpdir()
         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
index c59e59d63c5f027de57eda15e36d7a3676a156df..f1f03c31250b7789fb5a3ac385b4f6c3d9b30154 100644 (file)
@@ -47,24 +47,24 @@ GEM
       activemodel (>= 3.0.0)
       activesupport (>= 3.0.0)
       rack (>= 1.1.0)
-    addressable (2.5.0)
+    addressable (2.5.1)
       public_suffix (~> 2.0, >= 2.0.2)
     andand (1.3.3)
     arel (6.0.4)
-    arvados (0.1.20170215224121)
+    arvados (0.1.20170629115132)
       activesupport (>= 3, < 4.2.6)
       andand (~> 1.3, >= 1.3.3)
       google-api-client (>= 0.7, < 0.8.9)
       i18n (~> 0)
-      json (~> 1.7, >= 1.7.7)
+      json (>= 1.7.7, < 3)
       jwt (>= 0.1.5, < 2)
-    arvados-cli (0.1.20170322173355)
+    arvados-cli (0.1.20170817171636)
       activesupport (>= 3.2.13, < 5)
       andand (~> 1.3, >= 1.3.3)
       arvados (~> 0.1, >= 0.1.20150128223554)
       curb (~> 0.8)
       google-api-client (~> 0.6, >= 0.6.3, < 0.8.9)
-      json (~> 1.7, >= 1.7.7)
+      json (>= 1.7.7, < 3)
       oj (~> 2.0, >= 2.0.3)
       trollop (~> 2.0)
     autoparse (0.3.3)
@@ -125,7 +125,7 @@ GEM
     hashie (3.5.5)
     highline (1.7.8)
     hike (1.2.3)
-    i18n (0.8.1)
+    i18n (0.8.6)
     jquery-rails (4.2.2)
       rails-dom-testing (>= 1, < 3)
       railties (>= 4.2.0)
@@ -136,7 +136,7 @@ GEM
       addressable (~> 2.3)
     libv8 (3.16.14.19)
     little-plugger (1.1.4)
-    logging (2.2.0)
+    logging (2.2.2)
       little-plugger (~> 1.1)
       multi_json (~> 1.10)
     lograge (0.4.1)
@@ -148,13 +148,13 @@ GEM
       nokogiri (>= 1.5.9)
     mail (2.6.4)
       mime-types (>= 1.16, < 4)
-    memoist (0.15.0)
+    memoist (0.16.0)
     metaclass (0.0.4)
     mime-types (3.1)
       mime-types-data (~> 3.2015)
     mime-types-data (3.2016.0521)
     mini_portile2 (2.1.0)
-    minitest (5.10.1)
+    minitest (5.10.3)
     mocha (1.2.1)
       metaclass (~> 0.0.1)
     multi_json (1.12.1)
@@ -269,7 +269,7 @@ GEM
     thread_safe (0.3.6)
     tilt (1.4.1)
     trollop (2.1.2)
-    tzinfo (1.2.2)
+    tzinfo (1.2.3)
       thread_safe (~> 0.1)
     uglifier (2.7.2)
       execjs (>= 0.3.0)
@@ -321,4 +321,4 @@ DEPENDENCIES
   uglifier (~> 2.0)
 
 BUNDLED WITH
-   1.14.3
+   1.15.1
index 3986af9dc347aebe33ccd8fd26ccd4060eccee77..6d55506bb5742cf9be7c183186b60f6479ab5226 100644 (file)
@@ -16,7 +16,7 @@ class Arvados::V1::HealthcheckController < ApplicationController
   before_filter :check_auth_header
 
   def check_auth_header
-    mgmt_token = Rails.configuration.management_token
+    mgmt_token = Rails.configuration.ManagementToken
     auth_header = request.headers['Authorization']
 
     if !mgmt_token
index 8dafd1c2da713caa21ae6e1be392d21b7400be88..2f32556733b1a8a186bc3ad51a540518c485ac57 100644 (file)
@@ -444,7 +444,7 @@ common:
 
   # Token to be included in all healthcheck requests. Disabled by default.
   # Server expects request header of the format "Authorization: Bearer xxx"
-  management_token: false
+  ManagementToken: false
 
 development:
   force_ssl: false
index 282bdf14eb0c3c5b4298927536a1009e541f4f8a..551eefa8787baf10e3507a8d6768484de06fb8df 100644 (file)
@@ -13,7 +13,7 @@ class Arvados::V1::HealthcheckControllerTest < ActionController::TestCase
     [true, 'Bearer configuredmanagementtoken', 200, '{"health":"OK"}'],
   ].each do |enabled, header, error_code, error_msg|
     test "ping when #{if enabled then 'enabled' else 'disabled' end} with header '#{header}'" do
-      Rails.configuration.management_token = 'configuredmanagementtoken' if enabled
+      Rails.configuration.ManagementToken = 'configuredmanagementtoken' if enabled
 
       @request.headers['Authorization'] = header
       get :ping
index a43a556866d524920b04303aea252ff127ac8c44..30770fc0152a58125495420ebed2d8836768cfd3 100644 (file)
@@ -394,30 +394,33 @@ class Operations(llfuse.Operations):
 
     @catch_exceptions
     def on_event(self, ev):
-        if 'event_type' not in ev:
+        if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
             return
         with llfuse.lock:
-            new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
-            pdh = new_attrs.get("portable_data_hash")
-            # new_attributes.modified_at currently lacks
-            # subsecond precision (see #6347) so use event_at
-            # which should always be the same.
-            stamp = ev.get("event_at")
+            properties = ev.get("properties") or {}
+            old_attrs = properties.get("old_attributes") or {}
+            new_attrs = properties.get("new_attributes") or {}
 
             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
                 item.invalidate()
-                if stamp and pdh and ev.get("object_kind") == "arvados#collection":
-                    item.update(to_record_version=(stamp, pdh))
-                else:
-                    item.update()
-
-            oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
+                if ev.get("object_kind") == "arvados#collection":
+                    pdh = new_attrs.get("portable_data_hash")
+                    # new_attributes.modified_at currently lacks
+                    # subsecond precision (see #6347) so use event_at
+                    # which should always be the same.
+                    stamp = ev.get("event_at")
+                    if (stamp and pdh and item.writable() and
+                        item.collection is not None and
+                        item.collection.modified() and
+                        new_attrs.get("is_trashed") is not True):
+                        item.update(to_record_version=(stamp, pdh))
+
+            oldowner = old_attrs.get("owner_uuid")
             newowner = ev.get("object_owner_uuid")
             for parent in (
                     self.inodes.inode_cache.find_by_uuid(oldowner) +
                     self.inodes.inode_cache.find_by_uuid(newowner)):
-                parent.invalidate()
-                parent.update()
+                parent.child_event(ev)
 
     @catch_exceptions
     def getattr(self, inode, ctx=None):
index b3717ff07c23cb665505645d869b0670d1566b54..4dad90c86758edb118d7ab4b04958417533b9653 100644 (file)
@@ -205,12 +205,16 @@ class Mount(object):
         self.logger.info("enable write is %s", self.args.enable_write)
 
     def _setup_api(self):
-        self.api = arvados.safeapi.ThreadSafeApiCache(
-            apiconfig=arvados.config.settings(),
-            keep_params={
-                'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
-                'num_retries': self.args.retries,
-            })
+        try:
+            self.api = arvados.safeapi.ThreadSafeApiCache(
+                apiconfig=arvados.config.settings(),
+                keep_params={
+                    'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+                    'num_retries': self.args.retries,
+                })
+        except KeyError as e:
+            self.logger.error("Missing environment: %s", e)
+            exit(1)
         # Do a sanity check that we have a working arvados host + token.
         self.api.users().current().execute()
 
index 34295ef319afb125d1fe4971e37519cdb0ec983c..a51dd909b690df3cb39865d021b8f4daea4b471b 100644 (file)
@@ -139,3 +139,6 @@ class FreshBase(object):
 
     def finalize(self):
         pass
+
+    def child_event(self, ev):
+        pass
index 30ae6b40e0ae95c751ccaa1b3c0760b623d793c5..0178fe5544b07ddb30b9fa9c8e08d734a12cde0c 100644 (file)
@@ -771,6 +771,7 @@ class ProjectDirectory(Directory):
         self._poll_time = poll_time
         self._updating_lock = threading.Lock()
         self._current_user = None
+        self._full_listing = False
 
     def want_event_subscribe(self):
         return True
@@ -793,27 +794,35 @@ class ProjectDirectory(Directory):
     def uuid(self):
         return self.project_uuid
 
+    def items(self):
+        self._full_listing = True
+        return super(ProjectDirectory, self).items()
+
+    def namefn(self, i):
+        if 'name' in i:
+            if i['name'] is None or len(i['name']) == 0:
+                return None
+            elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
+                # collection or subproject
+                return i['name']
+            elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
+                # name link
+                return i['name']
+            elif 'kind' in i and i['kind'].startswith('arvados#'):
+                # something else
+                return "{}.{}".format(i['name'], i['kind'][8:])
+        else:
+            return None
+
+
     @use_counter
     def update(self):
         if self.project_object_file == None:
             self.project_object_file = ObjectFile(self.inode, self.project_object)
             self.inodes.add_entry(self.project_object_file)
 
-        def namefn(i):
-            if 'name' in i:
-                if i['name'] is None or len(i['name']) == 0:
-                    return None
-                elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
-                    # collection or subproject
-                    return i['name']
-                elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
-                    # name link
-                    return i['name']
-                elif 'kind' in i and i['kind'].startswith('arvados#'):
-                    # something else
-                    return "{}.{}".format(i['name'], i['kind'][8:])
-            else:
-                return None
+        if not self._full_listing:
+            return
 
         def samefn(a, i):
             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
@@ -835,31 +844,62 @@ class ProjectDirectory(Directory):
                     self.project_object = self.api.users().get(
                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
 
-                contents = arvados.util.list_all(self.api.groups().contents,
-                                                 self.num_retries, uuid=self.project_uuid)
+                contents = arvados.util.list_all(self.api.groups().list,
+                                                 self.num_retries,
+                                                 filters=[["owner_uuid", "=", self.project_uuid],
+                                                          ["group_class", "=", "project"]])
+                contents.extend(arvados.util.list_all(self.api.collections().list,
+                                                      self.num_retries,
+                                                      filters=[["owner_uuid", "=", self.project_uuid]]))
 
             # end with llfuse.lock_released, re-acquire lock
 
             self.merge(contents,
-                       namefn,
+                       self.namefn,
                        samefn,
                        self.createDirectory)
         finally:
             self._updating_lock.release()
 
+    def _add_entry(self, i, name):
+        ent = self.createDirectory(i)
+        self._entries[name] = self.inodes.add_entry(ent)
+        return self._entries[name]
+
     @use_counter
     @check_update
-    def __getitem__(self, item):
-        if item == '.arvados#project':
+    def __getitem__(self, k):
+        if k == '.arvados#project':
             return self.project_object_file
-        else:
-            return super(ProjectDirectory, self).__getitem__(item)
+        elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
+            return super(ProjectDirectory, self).__getitem__(k)
+        with llfuse.lock_released:
+            contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                       ["group_class", "=", "project"],
+                                                       ["name", "=", k]],
+                                              limit=1).execute(num_retries=self.num_retries)["items"]
+            if not contents:
+                contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                                ["name", "=", k]],
+                                                       limit=1).execute(num_retries=self.num_retries)["items"]
+        if contents:
+            name = sanitize_filename(self.namefn(contents[0]))
+            if name != k:
+                raise KeyError(k)
+            return self._add_entry(contents[0], name)
+
+        # Didn't find item
+        raise KeyError(k)
 
     def __contains__(self, k):
         if k == '.arvados#project':
             return True
-        else:
-            return super(ProjectDirectory, self).__contains__(k)
+        try:
+            self[k]
+            return True
+        except KeyError:
+            pass
+        return False
 
     @use_counter
     @check_update
@@ -925,6 +965,51 @@ class ProjectDirectory(Directory):
         self._entries[name_new] = ent
         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
 
+    @use_counter
+    def child_event(self, ev):
+        properties = ev.get("properties") or {}
+        old_attrs = properties.get("old_attributes") or {}
+        new_attrs = properties.get("new_attributes") or {}
+        old_attrs["uuid"] = ev["object_uuid"]
+        new_attrs["uuid"] = ev["object_uuid"]
+        old_name = sanitize_filename(self.namefn(old_attrs))
+        new_name = sanitize_filename(self.namefn(new_attrs))
+
+        # create events will have a new name, but not an old name
+        # delete events will have an old name, but not a new name
+        # update events will have an old and new name, and they may be same or different
+        # if they are the same, an unrelated field changed and there is nothing to do.
+
+        if old_attrs.get("owner_uuid") != self.project_uuid:
+            # Was moved from somewhere else, so don't try to remove entry.
+            old_name = None
+        if ev.get("object_owner_uuid") != self.project_uuid:
+            # Was moved to somewhere else, so don't try to add entry
+            new_name = None
+
+        if ev.get("object_kind") == "arvados#collection":
+            if old_attrs.get("is_trashed"):
+                # Was previously deleted
+                old_name = None
+            if new_attrs.get("is_trashed"):
+                # Has been deleted
+                new_name = None
+
+        if new_name != old_name:
+            ent = None
+            if old_name in self._entries:
+                ent = self._entries[old_name]
+                del self._entries[old_name]
+                self.inodes.invalidate_entry(self.inode, old_name.encode(self.inodes.encoding))
+
+            if new_name:
+                if ent is not None:
+                    self._entries[new_name] = ent
+                else:
+                    self._add_entry(new_attrs, new_name)
+            elif ent is not None:
+                self.inodes.del_entry(ent)
+
 
 class SharedDirectory(Directory):
     """A special directory that represents users or groups who have shared projects with me."""
index 225e4b2d22bc50d8dd8a7a97fae8cf767cc3d638..ec8868af7d799857d0eba14e8478f3030d9969cd 100644 (file)
@@ -220,66 +220,62 @@ class FuseTagsUpdateTest(MountTestBase):
             attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
 
 
+def fuseSharedTestHelper(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            # Double check that we can open and read objects in this folder as a file,
+            # and that its contents are what we expect.
+            baz_path = os.path.join(
+                mounttmp,
+                'FUSE User',
+                'FUSE Test Project',
+                'collection in FUSE project',
+                'baz')
+            with open(baz_path) as f:
+                self.assertEqual("baz", f.read())
+
+            # check mtime on collection
+            st = os.stat(baz_path)
+            try:
+                mtime = st.st_mtime_ns / 1000000000
+            except AttributeError:
+                mtime = st.st_mtime
+            self.assertEqual(mtime, 1391448174)
+
+            # shared_dirs is a list of the directories exposed
+            # by fuse.SharedDirectory (i.e. any object visible
+            # to the current user)
+            shared_dirs = llfuse.listdir(mounttmp)
+            shared_dirs.sort()
+            self.assertIn('FUSE User', shared_dirs)
+
+            # fuse_user_objs is a list of the objects owned by the FUSE
+            # test user (which present as files in the 'FUSE User'
+            # directory)
+            fuse_user_objs = llfuse.listdir(os.path.join(mounttmp, 'FUSE User'))
+            fuse_user_objs.sort()
+            self.assertEqual(['FUSE Test Project',                    # project owned by user
+                              'collection #1 owned by FUSE',          # collection owned by user
+                              'collection #2 owned by FUSE'          # collection owned by user
+                          ], fuse_user_objs)
+
+            # test_proj_files is a list of the files in the FUSE Test Project.
+            test_proj_files = llfuse.listdir(os.path.join(mounttmp, 'FUSE User', 'FUSE Test Project'))
+            test_proj_files.sort()
+            self.assertEqual(['collection in FUSE project'
+                          ], test_proj_files)
+
+
+    Test().runTest()
+
 class FuseSharedTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.SharedDirectory,
                         exclude=self.api.users().current().execute()['uuid'])
+        keep = arvados.keep.KeepClient()
+        keep.put("baz")
 
-        # shared_dirs is a list of the directories exposed
-        # by fuse.SharedDirectory (i.e. any object visible
-        # to the current user)
-        shared_dirs = llfuse.listdir(self.mounttmp)
-        shared_dirs.sort()
-        self.assertIn('FUSE User', shared_dirs)
-
-        # fuse_user_objs is a list of the objects owned by the FUSE
-        # test user (which present as files in the 'FUSE User'
-        # directory)
-        fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
-        fuse_user_objs.sort()
-        self.assertEqual(['FUSE Test Project',                    # project owned by user
-                          'collection #1 owned by FUSE',          # collection owned by user
-                          'collection #2 owned by FUSE',          # collection owned by user
-                          'pipeline instance owned by FUSE.pipelineInstance',  # pipeline instance owned by user
-                      ], fuse_user_objs)
-
-        # test_proj_files is a list of the files in the FUSE Test Project.
-        test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
-        test_proj_files.sort()
-        self.assertEqual(['collection in FUSE project',
-                          'pipeline instance in FUSE project.pipelineInstance',
-                          'pipeline template in FUSE project.pipelineTemplate'
-                      ], test_proj_files)
-
-        # Double check that we can open and read objects in this folder as a file,
-        # and that its contents are what we expect.
-        pipeline_template_path = os.path.join(
-                self.mounttmp,
-                'FUSE User',
-                'FUSE Test Project',
-                'pipeline template in FUSE project.pipelineTemplate')
-        with open(pipeline_template_path) as f:
-            j = json.load(f)
-            self.assertEqual("pipeline template in FUSE project", j['name'])
-
-        # check mtime on template
-        st = os.stat(pipeline_template_path)
-        try:
-            mtime = st.st_mtime_ns / 1000000000
-        except AttributeError:
-            mtime = st.st_mtime
-        self.assertEqual(mtime, 1397493304)
-
-        # check mtime on collection
-        st = os.stat(os.path.join(
-                self.mounttmp,
-                'FUSE User',
-                'collection #1 owned by FUSE'))
-        try:
-            mtime = st.st_mtime_ns / 1000000000
-        except AttributeError:
-            mtime = st.st_mtime
-        self.assertEqual(mtime, 1391448174)
+        self.pool.apply(fuseSharedTestHelper, (self.mounttmp,))
 
 
 class FuseHomeTest(MountTestBase):
index 16df210fe83cc12096549f5609fafeecdcc09340..67d46f6716b2ce22b09cb6b1204b08fdb3c3dd96 100644 (file)
@@ -149,6 +149,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                // SSL certificates. See
                // http://www.w3.org/TR/cors/#user-credentials).
                w.Header().Set("Access-Control-Allow-Origin", "*")
+               w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
        }
 
        arv := h.clientPool.Get()
index 0fe773a59e278b93264bf1d63457a14d9b709ef8..0ab3e969a0ebaa3f0d007e0c764a1e7507f6ec8a 100644 (file)
@@ -133,7 +133,7 @@ func init() {
                &s3UnsafeDelete,
                "s3-unsafe-delete",
                false,
-               "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
+               "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
 }
 
 // S3Volume implements Volume using an S3 bucket.
index 988b83c142b15d6a62058ec05ee0863d03476503..565db6601f18e68f5f621e0838ee06e051038028 100644 (file)
@@ -82,17 +82,20 @@ class BaseNodeManagerActor(pykka.ThreadingActor):
     def __init__(self, *args, **kwargs):
          super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
          self.actor_ref = TellableActorRef(self)
+         self._killfunc = kwargs.get("killfunc", os.kill)
 
     def on_failure(self, exception_type, exception_value, tb):
         lg = getattr(self, "_logger", logging)
         if (exception_type in (threading.ThreadError, MemoryError) or
             exception_type is OSError and exception_value.errno == errno.ENOMEM):
             lg.critical("Unhandled exception is a fatal error, killing Node Manager")
-            os.kill(os.getpid(), signal.SIGKILL)
+            self._killfunc(os.getpid(), signal.SIGKILL)
 
     def ping(self):
         return True
 
+    def get_thread(self):
+        return threading.current_thread()
 
 class WatchdogActor(pykka.ThreadingActor):
     def __init__(self, timeout, *args, **kwargs):
@@ -101,12 +104,13 @@ class WatchdogActor(pykka.ThreadingActor):
          self.actors = [a.proxy() for a in args]
          self.actor_ref = TellableActorRef(self)
          self._later = self.actor_ref.tell_proxy()
+         self._killfunc = kwargs.get("killfunc", os.kill)
 
     def kill_self(self, e, act):
         lg = getattr(self, "_logger", logging)
         lg.critical("Watchdog exception", exc_info=e)
         lg.critical("Actor %s watchdog ping time out, killing Node Manager", act)
-        os.kill(os.getpid(), signal.SIGKILL)
+        self._killfunc(os.getpid(), signal.SIGKILL)
 
     def on_start(self):
         self._later.run()
index fb9a6bf2142d58a63e0eba8d993e4fa2cb5e8ddb..c5dd1adef1f3173446d7c5efb3d8fbfc31d9d771 100644 (file)
@@ -240,6 +240,9 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         return super(ComputeNodeShutdownActor, self)._finished()
 
     def cancel_shutdown(self, reason, **kwargs):
+        if self.cancel_reason is not None:
+            # already cancelled
+            return
         self.cancel_reason = reason
         self._logger.info("Shutdown cancelled: %s.", reason)
         self._finished(success_flag=False)
@@ -257,6 +260,9 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     @_cancel_on_exception
     def shutdown_node(self):
+        if self.cancel_reason is not None:
+            # already cancelled
+            return
         if self.cancellable:
             self._logger.info("Checking that node is still eligible for shutdown")
             eligible, reason = self._monitor.shutdown_eligible().get()
index fa56578cffa1108526584ded9730a9cb5ffbbda9..c8883c3ae70f6614b7bd9063030c14e264dfe543 100644 (file)
@@ -73,7 +73,7 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
         if output in ("drng\n", "alloc\n", "drng*\n", "alloc*\n"):
             self._timer.schedule(time.time() + 10,
                                  self._later.await_slurm_drain)
-        elif output in ("idle\n"):
+        elif output in ("idle\n",):
             # Not in "drng" but idle, don't shut down
             self.cancel_shutdown("slurm state is %s" % output.strip(), try_resume=False)
         else:
index 1ba4e375a50c5b84b6fb321e38088537bad31b1a..e47f9fcb1d036b78f94af0af25e8c37dc17b5ad0 100644 (file)
@@ -58,7 +58,8 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                        'watchdog': '600',
                        'node_mem_scaling': '0.95'},
             'Manage': {'address': '127.0.0.1',
-                       'port': '-1'},
+                       'port': '-1',
+                       'ManagementToken': ''},
             'Logging': {'file': '/dev/stderr',
                         'level': 'WARNING'}
         }.iteritems():
index ca914e1096def7d28a9be41e90ffcbac2d01d203..4d2d3e0c0ace3e6ff9db5832d3f8a9dcc4b7ad9a 100644 (file)
@@ -8,9 +8,12 @@ from __future__ import absolute_import, print_function
 import logging
 import subprocess
 
+import arvados.util
+
 from . import clientactor
 from .config import ARVADOS_ERRORS
 
+
 class ServerCalculator(object):
     """Generate cloud server wishlists from an Arvados job queue.
 
@@ -58,7 +61,6 @@ class ServerCalculator(object):
         self.max_nodes = max_nodes or float('inf')
         self.max_price = max_price or float('inf')
         self.logger = logging.getLogger('arvnodeman.jobqueue')
-        self.logged_jobs = set()
 
         self.logger.info("Using cloud node sizes:")
         for s in self.cloud_sizes:
@@ -83,20 +85,26 @@ class ServerCalculator(object):
 
     def servers_for_queue(self, queue):
         servers = []
-        seen_jobs = set()
+        unsatisfiable_jobs = {}
         for job in queue:
-            seen_jobs.add(job['uuid'])
             constraints = job['runtime_constraints']
             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
             cloud_size = self.cloud_size_for_constraints(constraints)
             if cloud_size is None:
-                if job['uuid'] not in self.logged_jobs:
-                    self.logged_jobs.add(job['uuid'])
-                    self.logger.debug("job %s not satisfiable", job['uuid'])
-            elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
+                unsatisfiable_jobs[job['uuid']] = (
+                    'Requirements for a single node exceed the available '
+                    'cloud node size')
+            elif (want_count > self.max_nodes):
+                unsatisfiable_jobs[job['uuid']] = (
+                    "Job's min_nodes constraint is greater than the configured "
+                    "max_nodes (%d)" % self.max_nodes)
+            elif (want_count*cloud_size.price <= self.max_price):
                 servers.extend([cloud_size.real] * want_count)
-        self.logged_jobs.intersection_update(seen_jobs)
-        return servers
+            else:
+                unsatisfiable_jobs[job['uuid']] = (
+                    "Job's price (%d) is above system's max_price "
+                    "limit (%d)" % (want_count*cloud_size.price, self.max_price))
+        return (servers, unsatisfiable_jobs)
 
     def cheapest_size(self):
         return self.cloud_sizes[0]
@@ -107,6 +115,7 @@ class ServerCalculator(object):
                 return s
         return None
 
+
 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     """Actor to generate server wishlists from the job queue.
 
@@ -147,7 +156,7 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
             for out in squeue_out.splitlines():
                 try:
                     cpu, ram, disk, reason, jobname = out.split("|", 4)
-                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
+                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
                         queuelist.append({
                             "uuid": jobname,
                             "runtime_constraints": {
@@ -165,7 +174,28 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
         return queuelist
 
     def _got_response(self, queue):
-        server_list = self._calculator.servers_for_queue(queue)
+        server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
+        # Cancel any job/container with unsatisfiable requirements, emitting
+        # a log explaining why.
+        for job_uuid, reason in unsatisfiable_jobs.iteritems():
+            try:
+                self._client.logs().create(body={
+                    'object_uuid': job_uuid,
+                    'event_type': 'stderr',
+                    'properties': {'text': reason},
+                }).execute()
+                # Cancel the job depending on its type
+                if arvados.util.container_uuid_pattern.match(job_uuid):
+                    subprocess.check_call(['scancel', '--name='+job_uuid])
+                elif arvados.util.job_uuid_pattern.match(job_uuid):
+                    self._client.jobs().cancel(uuid=job_uuid).execute()
+                else:
+                    raise Exception('Unknown job type')
+                self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid)
+            except Exception as error:
+                self._logger.error("Trying to cancel job '%s': %s",
+                                   job_uuid,
+                                   error)
         self._logger.debug("Calculated wishlist: %s",
                            ', '.join(s.name for s in server_list) or "(empty)")
         return super(JobQueueMonitorActor, self)._got_response(server_list)
index 85719d3069103e8f3d14296b8ff3946241973ac3..cfd611285cffc91933fefb15a2dee37e14995c7e 100644 (file)
@@ -41,12 +41,34 @@ class Handler(http.server.BaseHTTPRequestHandler, object):
             self.send_header('Content-type', 'application/json')
             self.end_headers()
             self.wfile.write(tracker.get_json())
+        elif self.path == '/_health/ping':
+            code, msg = self.check_auth()
+
+            if code != 200:
+              self.send_response(code)
+              self.wfile.write(msg)
+            else:
+              self.send_response(200)
+              self.send_header('Content-type', 'application/json')
+              self.end_headers()
+              self.wfile.write(json.dumps({"health":"OK"}))
         else:
             self.send_response(404)
 
     def log_message(self, fmt, *args, **kwargs):
         _logger.info(fmt, *args, **kwargs)
 
+    def check_auth(self):
+        mgmt_token = self.server._config.get('Manage', 'ManagementToken')
+        auth_header = self.headers.get('Authorization', None)
+
+        if mgmt_token == '':
+          return 404, "disabled"
+        elif auth_header == None:
+          return 401, "authorization required"
+        elif auth_header != 'Bearer '+mgmt_token:
+          return 403, "authorization error"
+        return 200, ""
 
 class Tracker(object):
     def __init__(self):
index 4d2a1394df2b69b3823b8120524e06af7bcb7cb5..e7e3f25fe383239c310f72e2bd234cefc46f9619 100644 (file)
@@ -19,11 +19,15 @@ class TimedCallBackActor(actor_class):
     message at a later time.  This actor runs the necessary event loop for
     delivery.
     """
-    def __init__(self, max_sleep=1):
+    def __init__(self, max_sleep=1, timefunc=None):
         super(TimedCallBackActor, self).__init__()
         self._proxy = self.actor_ref.tell_proxy()
         self.messages = []
         self.max_sleep = max_sleep
+        if timefunc is None:
+            self._timefunc = time.time
+        else:
+            self._timefunc = timefunc
 
     def schedule(self, delivery_time, receiver, *args, **kwargs):
         if not self.messages:
@@ -33,7 +37,7 @@ class TimedCallBackActor(actor_class):
     def deliver(self):
         if not self.messages:
             return
-        til_next = self.messages[0][0] - time.time()
+        til_next = self.messages[0][0] - self._timefunc()
         if til_next <= 0:
             t, receiver, args, kwargs = heapq.heappop(self.messages)
             try:
index 59d95e3d22f1231030359141298d79f2a547ad8b..d083bf168b50a54087ad398d925e007eab972713 100644 (file)
@@ -33,7 +33,7 @@ setup(name='arvados-node-manager',
       ],
       install_requires=[
           'apache-libcloud>=0.20',
-          'arvados-python-client>=0.1.20150206225333',
+          'arvados-python-client>=0.1.20170731145219',
           'future',
           'pykka',
           'python-daemon',
index feba3ce185caaf46517adc988fd63fafbc4985b1..bdd3ffdcb7e0a247676ddbac49943ce32b25eb30 100755 (executable)
@@ -40,6 +40,7 @@ detail.addHandler(logging.StreamHandler(detail_content))
 fake_slurm = None
 compute_nodes = None
 all_jobs = None
+unsatisfiable_job_scancelled = None
 
 def update_script(path, val):
     with open(path+"_", "w") as f:
@@ -54,6 +55,33 @@ def set_squeue(g):
                   "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
     return 0
 
+def set_queue_unsatisfiable(g):
+    global all_jobs, unsatisfiable_job_scancelled
+    # Simulate a job requesting a 99 core node.
+    update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
+                  "\n".join("echo '99|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+    update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
+                  "\ntouch %s" % unsatisfiable_job_scancelled)
+    return 0
+
+def job_cancelled(g):
+    global unsatisfiable_job_scancelled
+    cancelled_job = g.group(1)
+    api = arvados.api('v1')
+    # Check that 'scancel' was called
+    if not os.path.isfile(unsatisfiable_job_scancelled):
+        return 1
+    # Check for the log entry
+    log_entry = api.logs().list(
+        filters=[
+            ['object_uuid', '=', cancelled_job],
+            ['event_type', '=', 'stderr'],
+        ]).execute()['items'][0]
+    if not re.match(
+            r"Requirements for a single node exceed the available cloud node size",
+            log_entry['properties']['text']):
+        return 1
+    return 0
 
 def node_paired(g):
     global compute_nodes
@@ -115,6 +143,9 @@ def expect_count(count, checks, pattern, g):
 
 def run_test(name, actions, checks, driver_class, jobs, provider):
     code = 0
+    global unsatisfiable_job_scancelled
+    unsatisfiable_job_scancelled = os.path.join(tempfile.mkdtemp(),
+                                                "scancel_called")
 
     # Delete any stale node records
     api = arvados.api('v1')
@@ -159,7 +190,7 @@ def run_test(name, actions, checks, driver_class, jobs, provider):
 
     # Test main loop:
     # - Read line
-    # - Apply negative checks (thinks that are not supposed to happen)
+    # - Apply negative checks (things that are not supposed to happen)
     # - Check timeout
     # - Check if the next action should trigger
     # - If all actions are exhausted, terminate with test success
@@ -213,6 +244,7 @@ def run_test(name, actions, checks, driver_class, jobs, provider):
         code = 1
 
     shutil.rmtree(fake_slurm)
+    shutil.rmtree(os.path.dirname(unsatisfiable_job_scancelled))
 
     if code == 0:
         logger.info("%s passed", name)
@@ -228,6 +260,23 @@ def main():
     # Test lifecycle.
 
     tests = {
+        "test_unsatisfiable_jobs" : (
+            # Actions (pattern -> action)
+            [
+                (r".*Daemon started", set_queue_unsatisfiable),
+                (r".*Cancelled unsatisfiable job '(\S+)'", job_cancelled),
+            ],
+            # Checks (things that shouldn't happen)
+            {
+                r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": fail,
+                r".*Trying to cancel job '(\S+)'": fail,
+            },
+            # Driver class
+            "arvnodeman.test.fake_driver.FakeDriver",
+            # Jobs
+            {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+            # Provider
+            "azure"),
         "test_single_node_azure": (
             [
                 (r".*Daemon started", set_squeue),
diff --git a/services/nodemanager/tests/stress_test.cwl b/services/nodemanager/tests/stress_test.cwl
new file mode 100644 (file)
index 0000000..082df64
--- /dev/null
@@ -0,0 +1,51 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+#
+#
+# Usage: arvados-cwl-runner stress_test.cwl
+#
+# Submits 100 jobs or containers, creating load on node manager and
+# scheduler.
+
+class: Workflow
+cwlVersion: v1.0
+requirements:
+  ScatterFeatureRequirement: {}
+  InlineJavascriptRequirement: {}
+inputs: []
+outputs: []
+steps:
+  step1:
+    in: []
+    out: [out]
+    run:
+      class: ExpressionTool
+      inputs: []
+      outputs:
+        out: int[]
+      expression: |
+        ${
+          var r = [];
+          for (var i = 1; i <= 100; i++) {
+            r.push(i);
+          }
+          return {out: r};
+        }
+  step2:
+    in:
+      num: step1/out
+    out: []
+    scatter: num
+    run:
+      class: CommandLineTool
+      requirements:
+        ShellCommandRequirement: {}
+      inputs:
+        num: int
+      outputs: []
+      arguments: [echo, "starting",
+        {shellQuote: false, valueFrom: "&&"},
+        sleep, $((101-inputs.num)*2),
+        {shellQuote: false, valueFrom: "&&"},
+        echo, "the number of the day is", $(inputs.num)]
index a8aa2e38fb46ce2c3e3b0d2ae7f35f01b12e4952..c44305d2b96a66a4cf6ddedf45556f3c58085532 100644 (file)
@@ -100,6 +100,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
             ]
         self.make_actor()
         self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.setup_actor.ping().get(self.TIMEOUT)
         self.assertEqual(1, self.cloud_client.post_create_node.call_count)
 
     def test_instance_exceeded_not_retried(self):
@@ -151,6 +152,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         self.api_client.nodes().create().execute.side_effect = retry_resp
         self.api_client.nodes().update().execute.side_effect = retry_resp
         self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.setup_actor.ping().get(self.TIMEOUT)
         self.assertEqual(self.setup_actor.actor_ref.actor_urn,
                          subscriber.call_args[0][0].actor_ref.actor_urn)
 
@@ -207,17 +209,19 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
         self.cloud_client.destroy_node.return_value = True
         self.make_actor(cancellable=True)
-        self.check_success_flag(False)
+        self.check_success_flag(False, 2)
         self.assertFalse(self.cloud_client.destroy_node.called)
 
     def test_uncancellable_shutdown(self, *mocks):
         self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
         self.cloud_client.destroy_node.return_value = True
         self.make_actor(cancellable=False)
-        self.check_success_flag(True, 2)
+        self.check_success_flag(True, 4)
         self.assertTrue(self.cloud_client.destroy_node.called)
 
     def test_arvados_node_cleaned_after_shutdown(self, *mocks):
+        if len(mocks) == 1:
+            mocks[0].return_value = "drain\n"
         cloud_node = testutil.cloud_node_mock(62)
         arv_node = testutil.arvados_node_mock(62)
         self.make_mocks(cloud_node, arv_node)
@@ -235,12 +239,15 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         self.assertTrue(update_mock().execute.called)
 
     def test_arvados_node_not_cleaned_after_shutdown_cancelled(self, *mocks):
+        if len(mocks) == 1:
+            mocks[0].return_value = "idle\n"
         cloud_node = testutil.cloud_node_mock(61)
         arv_node = testutil.arvados_node_mock(61)
         self.make_mocks(cloud_node, arv_node, shutdown_open=False)
         self.cloud_client.destroy_node.return_value = False
         self.make_actor(cancellable=True)
         self.shutdown_actor.cancel_shutdown("test")
+        self.shutdown_actor.ping().get(self.TIMEOUT)
         self.check_success_flag(False, 2)
         self.assertFalse(self.arvados_client.nodes().update.called)
 
@@ -338,13 +345,11 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
     def test_in_state_when_no_state_available(self):
         self.make_actor(arv_node=testutil.arvados_node_mock(
                 crunch_worker_state=None))
-        print(self.node_actor.get_state().get())
         self.assertTrue(self.node_state('idle'))
 
     def test_in_state_when_no_state_available_old(self):
         self.make_actor(arv_node=testutil.arvados_node_mock(
                 crunch_worker_state=None, age=90000))
-        print(self.node_actor.get_state().get())
         self.assertTrue(self.node_state('down'))
 
     def test_in_idle_state(self):
index c7eb7afc631cd2b06e2c7e6753b2e696ab27b520..0b6162dfaa64405df53794bb575bfffd2420bbff 100644 (file)
@@ -32,13 +32,20 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
             self.timer = testutil.MockTimer(False)
         self.make_actor()
         self.check_success_flag(None, 0)
+        # At this point, 1st try should have happened.
+
         self.timer.deliver()
         self.check_success_flag(None, 0)
-        self.timer.deliver()
+        # At this point, 2nd try should have happened.
+
         # Order is critical here: if the mock gets called when no return value
         # or side effect is set, we may invoke a real subprocess.
         proc_mock.return_value = end_state
         proc_mock.side_effect = None
+
+        # 3rd try
+        self.timer.deliver()
+
         self.check_success_flag(True, 3)
         self.check_slurm_got_args(proc_mock, 'NodeName=compute63')
 
@@ -67,20 +74,18 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         self.check_success_flag(True)
         self.assertFalse(proc_mock.called)
 
-    def test_node_undrained_when_shutdown_cancelled(self, proc_mock):
+    def test_node_resumed_when_shutdown_cancelled(self, proc_mock):
         try:
             proc_mock.side_effect = iter(['', 'drng\n', 'drng\n', ''])
             self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
             self.timer = testutil.MockTimer(False)
             self.make_actor()
             self.busywait(lambda: proc_mock.call_args is not None)
-            self.shutdown_actor.cancel_shutdown("test").get(self.TIMEOUT)
+            self.shutdown_actor.cancel_shutdown("test")
             self.check_success_flag(False, 2)
-            self.assertEqual(proc_mock.call_args_list,
-                             [mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']),
-                              mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
-                              mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
-                              mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME'])])
+            self.assertEqual(proc_mock.call_args_list[0], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']))
+            self.assertEqual(proc_mock.call_args_list[-1], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME']))
+
         finally:
             self.shutdown_actor.actor_ref.stop()
 
@@ -88,10 +93,10 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n', 'idle\n'])
         self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
         self.make_actor()
-        self.check_success_flag(False, 2)
+        self.check_success_flag(False, 5)
 
     def test_issue_slurm_drain_retry(self, proc_mock):
-        proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
+        proc_mock.side_effect = iter([OSError, OSError, 'drng\n', 'drain\n'])
         self.check_success_after_reset(proc_mock, timer=False)
 
     def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
index f714c3c8b38761c0f38e46b70d7a12fd5124bc80..1efa1ffeb35199c251d13e217f2cb37c146c4622 100644 (file)
@@ -21,6 +21,15 @@ import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
+
+    def busywait(self, f):
+        n = 0
+        while not f() and n < 200:
+            time.sleep(.1)
+            self.daemon.ping().get(self.TIMEOUT)
+            n += 1
+        self.assertTrue(f())
+
     def mock_node_start(self, **kwargs):
         # Make sure that every time the daemon starts a setup actor,
         # it gets a new mock object back.
@@ -102,14 +111,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
 
     def monitor_list(self):
-        return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
+        return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
 
-    def monitored_arvados_nodes(self):
+    def monitored_arvados_nodes(self, include_unpaired=True):
         pairings = []
         for future in [actor.proxy().arvados_node
                        for actor in self.monitor_list()]:
             try:
-                pairings.append(future.get(self.TIMEOUT))
+                g = future.get(self.TIMEOUT)
+                if g or include_unpaired:
+                    pairings.append(g)
             except pykka.ActorDeadError:
                 pass
         return pairings
@@ -117,6 +128,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def alive_monitor_count(self):
         return len(self.monitored_arvados_nodes())
 
+    def paired_monitor_count(self):
+        return len(self.monitored_arvados_nodes(False))
+
     def assertShutdownCancellable(self, expected=True):
         self.assertTrue(self.node_shutdown.start.called)
         self.assertIs(expected,
@@ -126,17 +140,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_easy_node_creation(self):
         size = testutil.MockSize(1)
         self.make_daemon(want_sizes=[size])
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: self.node_setup.start.called)
 
     def check_monitors_arvados_nodes(self, *arv_nodes):
+        self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
 
     def test_node_pairing(self):
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon([cloud_node], [arv_node])
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_node_pairing_after_arvados_update(self):
@@ -145,7 +158,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          [testutil.arvados_node_mock(1, ip_address=None)])
         arv_node = testutil.arvados_node_mock(2)
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_arvados_node_un_and_re_paired(self):
@@ -157,9 +169,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
         self.check_monitors_arvados_nodes(arv_node)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.assertEqual(0, self.alive_monitor_count())
+        self.busywait(lambda: 0 == self.alive_monitor_count())
         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_old_arvados_node_not_double_assigned(self):
@@ -179,8 +190,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_node_count_satisfied(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
                          want_sizes=[testutil.MockSize(1)])
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_dont_count_missing_as_busy(self):
         size = testutil.MockSize(1)
@@ -191,8 +201,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                             2,
                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size, size])
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: 2 == self.alive_monitor_count())
+        self.busywait(lambda: self.node_setup.start.called)
 
     def test_missing_counts_towards_max(self):
         size = testutil.MockSize(1)
@@ -202,8 +212,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size, size],
                          max_nodes=2)
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_excess_counts_missing(self):
         size = testutil.MockSize(1)
@@ -212,7 +221,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -224,7 +233,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         get_cloud_node = mock.MagicMock(name="get_cloud_node")
         get_cloud_node.get.return_value = cloud_nodes[1]
         mock_node_monitor = mock.MagicMock()
@@ -233,10 +242,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
 
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.alive_monitor_count())
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
-        self.assertEqual(1, self.node_shutdown.start.call_count)
+        self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
 
     def test_booting_nodes_counted(self):
         cloud_node = testutil.cloud_node_mock(1)
@@ -246,17 +255,15 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.max_nodes.get(self.TIMEOUT)
         self.assertTrue(self.node_setup.start.called)
         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
 
     def test_boot_new_node_when_all_nodes_busy(self):
         size = testutil.MockSize(2)
         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
                          [size], avail_sizes=[(size, {"cores":1})])
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         self.busywait(lambda: self.node_setup.start.called)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
 
     def test_boot_new_node_below_min_nodes(self):
         min_size = testutil.MockSize(1)
@@ -402,7 +409,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         now = time.time()
         self.monitor_list()[0].tell_proxy().consider_shutdown()
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_booted_node_shut_down_when_never_paired(self):
@@ -414,7 +420,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_cloud_nodes([cloud_node])
         self.monitor_list()[0].tell_proxy().consider_shutdown()
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_booted_node_shut_down_when_never_working(self):
@@ -427,7 +432,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
         self.daemon.update_cloud_nodes([cloud_node])
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_node_that_pairs_not_considered_failed_boot(self):
@@ -457,8 +461,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_booting_nodes_shut_down(self):
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
+        self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
 
     def test_all_booting_nodes_tried_to_shut_down(self):
         size = testutil.MockSize(2)
@@ -483,7 +486,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(1)
         size = testutil.MockSize(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -493,7 +496,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -501,11 +504,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_shutdown_accepted_below_capacity(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_shutdown.start.called)
+        self.busywait(lambda: self.node_shutdown.start.called)
 
     def test_shutdown_declined_when_idle_and_job_queued(self):
         size = testutil.MockSize(1)
@@ -513,7 +515,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
                      testutil.arvados_node_mock(4, job_uuid=None)]
         self.make_daemon(cloud_nodes, arv_nodes, [size])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         for mon_ref in self.monitor_list():
             monitor = mon_ref.proxy()
             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
@@ -532,13 +534,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = False
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
 
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = True
         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.assertEqual(0, self.alive_monitor_count())
+        self.busywait(lambda: 0 == self.paired_monitor_count())
 
     def test_nodes_shutting_down_replaced_below_max_nodes(self):
         size = testutil.MockSize(6)
@@ -551,21 +553,19 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_shutdown.start.called)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(6)]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: self.node_setup.start.called)
 
     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
         cloud_node = testutil.cloud_node_mock(7)
         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
                          max_nodes=1)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.assertTrue(self.node_shutdown.start.called)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(7)]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_nodes_shutting_down_count_against_excess(self):
         size = testutil.MockSize(8)
@@ -573,7 +573,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
         self.make_daemon(cloud_nodes, arv_nodes, [size],
                          avail_sizes=[(size, {"cores":1})])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -598,8 +598,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         size = testutil.MockSize(2)
         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
         self.timer.deliver()
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
 
     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
@@ -607,9 +606,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(
-            1, self.last_shutdown.stop.call_count)
+        self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
 
     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
@@ -620,8 +617,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         # the ActorDeadError.
         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.last_shutdown.stop.call_count)
+        self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
 
     def test_node_create_two_sizes(self):
         small = testutil.MockSize(1)
@@ -675,7 +671,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(3)],
                          want_sizes=[small, small, big],
                          avail_sizes=avail_sizes)
-
+        self.busywait(lambda: 3 == self.paired_monitor_count())
         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
 
         self.assertEqual(0, self.node_shutdown.start.call_count)
@@ -686,10 +682,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         booting = self.daemon.booting.get()
         cloud_nodes = self.daemon.cloud_nodes.get()
 
-        self.stop_proxy(self.daemon)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
 
-        self.assertEqual(1, self.node_setup.start.call_count)
-        self.assertEqual(1, self.node_shutdown.start.call_count)
+        self.stop_proxy(self.daemon)
 
         # booting a new big node
         sizecounts = {a[0].id: 0 for a in avail_sizes}
index cfac61ba2eaf64c67cf44aeb298f2caf4d9b1f86..ef4423dafaf7762b5d8a8c95fdbaacf630156917 100644 (file)
@@ -19,8 +19,8 @@ from . import testutil
 import arvnodeman.baseactor
 
 class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
-    def __init__(self, e):
-        super(BogusActor, self).__init__()
+    def __init__(self, e, killfunc=None):
+        super(BogusActor, self).__init__(killfunc=killfunc)
         self.exp = e
 
     def doStuff(self):
@@ -29,30 +29,35 @@ class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
     def ping(self):
         # Called by WatchdogActorTest, this delay is longer than the test timeout
         # of 1 second, which should cause the watchdog ping to fail.
-        time.sleep(4)
+        time.sleep(2)
         return True
 
 class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
     def test_fatal_error(self):
         for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
-            with mock.patch('os.kill') as kill_mock:
-                act = BogusActor.start(e).tell_proxy()
-                act.doStuff()
-                act.actor_ref.stop(block=True)
-                self.assertTrue(kill_mock.called)
-
-    @mock.patch('os.kill')
-    def test_nonfatal_error(self, kill_mock):
-        act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+            kill_mock = mock.Mock('os.kill')
+            bgact = BogusActor.start(e, killfunc=kill_mock)
+            act_thread = bgact.proxy().get_thread().get()
+            act = bgact.tell_proxy()
+            act.doStuff()
+            act.actor_ref.stop(block=True)
+            act_thread.join()
+            self.assertTrue(kill_mock.called)
+
+    def test_nonfatal_error(self):
+        kill_mock = mock.Mock('os.kill')
+        act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
         act.doStuff()
         act.actor_ref.stop(block=True)
         self.assertFalse(kill_mock.called)
 
 class WatchdogActorTest(testutil.ActorTestMixin, unittest.TestCase):
-    @mock.patch('os.kill')
-    def test_time_timout(self, kill_mock):
+
+    def test_time_timout(self):
+        kill_mock = mock.Mock('os.kill')
         act = BogusActor.start(OSError(errno.ENOENT, ""))
-        watch = arvnodeman.baseactor.WatchdogActor.start(1, act)
+        watch = arvnodeman.baseactor.WatchdogActor.start(1, act, killfunc=kill_mock)
+        time.sleep(1)
         watch.stop(block=True)
         act.stop(block=True)
         self.assertTrue(kill_mock.called)
index 8aa0835aca395af7594659e6c836e6245d189494..b1d5e002767a000d7487aa82c8ee5bb9c312e320 100644 (file)
@@ -24,63 +24,69 @@ class ServerCalculatorTestCase(unittest.TestCase):
 
     def test_empty_queue_needs_no_servers(self):
         servcalc = self.make_calculator([1])
-        self.assertEqual([], servcalc.servers_for_queue([]))
+        self.assertEqual(([], {}), servcalc.servers_for_queue([]))
 
     def test_easy_server_count(self):
         servcalc = self.make_calculator([1])
-        servlist = self.calculate(servcalc, {'min_nodes': 3})
+        servlist, _ = self.calculate(servcalc, {'min_nodes': 3})
         self.assertEqual(3, len(servlist))
 
     def test_default_5pct_ram_value_decrease(self):
         servcalc = self.make_calculator([1])
-        servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
+        servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
         self.assertEqual(0, len(servlist))
-        servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 121})
+        servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 121})
         self.assertEqual(1, len(servlist))
 
     def test_custom_node_mem_scaling_factor(self):
         # Simulate a custom 'node_mem_scaling' config parameter by passing
         # the value to ServerCalculator
         servcalc = self.make_calculator([1], node_mem_scaling=0.5)
-        servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
+        servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
         self.assertEqual(0, len(servlist))
-        servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 64})
+        servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 64})
         self.assertEqual(1, len(servlist))
 
     def test_implicit_server_count(self):
         servcalc = self.make_calculator([1])
-        servlist = self.calculate(servcalc, {}, {'min_nodes': 3})
+        servlist, _ = self.calculate(servcalc, {}, {'min_nodes': 3})
         self.assertEqual(4, len(servlist))
 
     def test_bad_min_nodes_override(self):
         servcalc = self.make_calculator([1])
-        servlist = self.calculate(servcalc,
-                                  {'min_nodes': -2}, {'min_nodes': 'foo'})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_nodes': -2}, {'min_nodes': 'foo'})
         self.assertEqual(2, len(servlist))
 
-    def test_ignore_unsatisfiable_jobs(self):
+    def test_ignore_and_return_unsatisfiable_jobs(self):
         servcalc = self.make_calculator([1], max_nodes=9)
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 2},
-                                  {'min_ram_mb_per_node': 256},
-                                  {'min_nodes': 6},
-                                  {'min_nodes': 12},
-                                  {'min_scratch_mb_per_node': 300000})
+        servlist, u_jobs = self.calculate(servcalc,
+                                          {'min_cores_per_node': 2},
+                                          {'min_ram_mb_per_node': 256},
+                                          {'min_nodes': 6},
+                                          {'min_nodes': 12},
+                                          {'min_scratch_mb_per_node': 300000})
         self.assertEqual(6, len(servlist))
+        # Only unsatisfiable jobs are returned on u_jobs
+        self.assertIn('zzzzz-jjjjj-000000000000000', u_jobs.keys())
+        self.assertIn('zzzzz-jjjjj-000000000000001', u_jobs.keys())
+        self.assertNotIn('zzzzz-jjjjj-000000000000002', u_jobs.keys())
+        self.assertIn('zzzzz-jjjjj-000000000000003', u_jobs.keys())
+        self.assertIn('zzzzz-jjjjj-000000000000004', u_jobs.keys())
 
     def test_ignore_too_expensive_jobs(self):
         servcalc = self.make_calculator([1, 2], max_nodes=12, max_price=6)
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 1, 'min_nodes': 6})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 1, 'min_nodes': 6})
         self.assertEqual(6, len(servlist))
 
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 2, 'min_nodes': 6})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 2, 'min_nodes': 6})
         self.assertEqual(0, len(servlist))
 
     def test_job_requesting_max_nodes_accepted(self):
         servcalc = self.make_calculator([1], max_nodes=4)
-        servlist = self.calculate(servcalc, {'min_nodes': 4})
+        servlist, _ = self.calculate(servcalc, {'min_nodes': 4})
         self.assertEqual(4, len(servlist))
 
     def test_cheapest_size(self):
@@ -89,37 +95,37 @@ class ServerCalculatorTestCase(unittest.TestCase):
 
     def test_next_biggest(self):
         servcalc = self.make_calculator([1, 2, 4, 8])
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 3},
-                                  {'min_cores_per_node': 6})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 3},
+                                     {'min_cores_per_node': 6})
         self.assertEqual([servcalc.cloud_sizes[2].id,
                           servcalc.cloud_sizes[3].id],
                          [s.id for s in servlist])
 
     def test_multiple_sizes(self):
         servcalc = self.make_calculator([1, 2])
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 2},
-                                  {'min_cores_per_node': 1},
-                                  {'min_cores_per_node': 1})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 2},
+                                     {'min_cores_per_node': 1},
+                                     {'min_cores_per_node': 1})
         self.assertEqual([servcalc.cloud_sizes[1].id,
                           servcalc.cloud_sizes[0].id,
                           servcalc.cloud_sizes[0].id],
                          [s.id for s in servlist])
 
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 1},
-                                  {'min_cores_per_node': 2},
-                                  {'min_cores_per_node': 1})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 1},
+                                     {'min_cores_per_node': 2},
+                                     {'min_cores_per_node': 1})
         self.assertEqual([servcalc.cloud_sizes[0].id,
                           servcalc.cloud_sizes[1].id,
                           servcalc.cloud_sizes[0].id],
                          [s.id for s in servlist])
 
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 1},
-                                  {'min_cores_per_node': 1},
-                                  {'min_cores_per_node': 2})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 1},
+                                     {'min_cores_per_node': 1},
+                                     {'min_cores_per_node': 2})
         self.assertEqual([servcalc.cloud_sizes[0].id,
                           servcalc.cloud_sizes[0].id,
                           servcalc.cloud_sizes[1].id],
@@ -131,16 +137,38 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
                                    unittest.TestCase):
     TEST_CLASS = jobqueue.JobQueueMonitorActor
 
+
     class MockCalculator(object):
         @staticmethod
         def servers_for_queue(queue):
-            return [testutil.MockSize(n) for n in queue]
+            return ([testutil.MockSize(n) for n in queue], {})
+
+
+    class MockCalculatorUnsatisfiableJobs(object):
+        @staticmethod
+        def servers_for_queue(queue):
+            return ([], {k["uuid"]: "Unsatisfiable job mock" for k in queue})
 
 
     def build_monitor(self, side_effect, *args, **kwargs):
         super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs)
         self.client.jobs().queue().execute.side_effect = side_effect
 
+    @mock.patch("subprocess.check_call")
+    @mock.patch("subprocess.check_output")
+    def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
+        job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
+        container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
+        mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
+
+        self.build_monitor([{'items': [{'uuid': job_uuid}]}],
+                           self.MockCalculatorUnsatisfiableJobs(), True, True)
+        self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+        self.monitor.ping().get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
+        self.client.jobs().cancel.assert_called_with(uuid=job_uuid)
+        mock_scancel.assert_called_with(['scancel', '--name='+container_uuid])
+
     @mock.patch("subprocess.check_output")
     def test_subscribers_get_server_lists(self, mock_squeue):
         mock_squeue.return_value = ""
index a18eff3a5955b0dc0faf7c7bfcbf3b7192633a6c..a236e4f0eecd1d2ba30c3e01adc227e245aba03e 100644 (file)
@@ -6,6 +6,7 @@
 from __future__ import absolute_import, print_function
 from future import standard_library
 
+import json
 import requests
 import unittest
 
@@ -14,10 +15,15 @@ import arvnodeman.config as config
 
 
 class TestServer(object):
+    def __init__(self, management_token=None):
+        self.mgmt_token = management_token
+
     def __enter__(self):
         cfg = config.NodeManagerConfig()
         cfg.set('Manage', 'port', '0')
         cfg.set('Manage', 'address', '127.0.0.1')
+        if self.mgmt_token != None:
+            cfg.set('Manage', 'ManagementToken', self.mgmt_token)
         self.srv = status.Server(cfg)
         self.srv.start()
         addr, port = self.srv.server_address
@@ -33,6 +39,11 @@ class TestServer(object):
     def get_status(self):
         return self.get_status_response().json()
 
+    def get_healthcheck_ping(self, auth_header=None):
+        headers = {}
+        if auth_header != None:
+            headers['Authorization'] = auth_header
+        return requests.get(self.srv_base+'/_health/ping', headers=headers)
 
 class StatusServerUpdates(unittest.TestCase):
     def test_updates(self):
@@ -56,3 +67,32 @@ class StatusServerDisabled(unittest.TestCase):
         self.srv.start()
         self.assertFalse(self.srv.enabled)
         self.assertFalse(getattr(self.srv, '_thread', False))
+
+class HealthcheckPing(unittest.TestCase):
+    def test_ping_disabled(self):
+        with TestServer() as srv:
+            r = srv.get_healthcheck_ping()
+            self.assertEqual(404, r.status_code)
+
+    def test_ping_no_auth(self):
+        with TestServer('configuredmanagementtoken') as srv:
+            r = srv.get_healthcheck_ping()
+            self.assertEqual(401, r.status_code)
+
+    def test_ping_bad_auth_format(self):
+        with TestServer('configuredmanagementtoken') as srv:
+            r = srv.get_healthcheck_ping('noBearer')
+            self.assertEqual(403, r.status_code)
+
+    def test_ping_bad_auth_token(self):
+        with TestServer('configuredmanagementtoken') as srv:
+            r = srv.get_healthcheck_ping('Bearer badtoken')
+            self.assertEqual(403, r.status_code)
+
+    def test_ping_success(self):
+        with TestServer('configuredmanagementtoken') as srv:
+            r = srv.get_healthcheck_ping('Bearer configuredmanagementtoken')
+            self.assertEqual(200, r.status_code)
+            self.assertEqual('application/json', r.headers['content-type'])
+            resp = r.json()
+            self.assertEqual('{"health": "OK"}', json.dumps(resp))
index cee7fe1c335247b6aba73df9abed3362b53c402c..21a9b5ac778651c58084b31001bda8c56a9ef9ed 100644 (file)
@@ -26,27 +26,29 @@ class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
 
     def test_delayed_turnaround(self):
         receiver = mock.Mock()
-        with mock.patch('time.time', return_value=0) as mock_now:
-            deliverer = timedcallback.TimedCallBackActor.start().proxy()
-            deliverer.schedule(1, receiver, 'delayed')
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            self.assertFalse(receiver.called)
-            mock_now.return_value = 2
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            self.stop_proxy(deliverer)
+        mock_now = mock.Mock()
+        mock_now.return_value = 0
+        deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+        deliverer.schedule(1, receiver, 'delayed')
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        self.assertFalse(receiver.called)
+        mock_now.return_value = 2
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        self.stop_proxy(deliverer)
         receiver.assert_called_with('delayed')
 
     def test_out_of_order_scheduling(self):
         receiver = mock.Mock()
-        with mock.patch('time.time', return_value=1.5) as mock_now:
-            deliverer = timedcallback.TimedCallBackActor.start().proxy()
-            deliverer.schedule(2, receiver, 'second')
-            deliverer.schedule(1, receiver, 'first')
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            receiver.assert_called_with('first')
-            mock_now.return_value = 2.5
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            self.stop_proxy(deliverer)
+        mock_now = mock.Mock()
+        mock_now.return_value = 1.5
+        deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+        deliverer.schedule(2, receiver, 'second')
+        deliverer.schedule(1, receiver, 'first')
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        receiver.assert_called_with('first')
+        mock_now.return_value = 2.5
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        self.stop_proxy(deliverer)
         receiver.assert_called_with('second')
 
     def test_dead_actors_ignored(self):
@@ -61,4 +63,3 @@ class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
 
 if __name__ == '__main__':
     unittest.main()
-
index 0a483709adf5b69a8fd13647f9e9bd866fa836cf..6e134375bb8aec05bdd71f830e28f277d3cff5b5 100644 (file)
@@ -123,7 +123,10 @@ class ActorTestMixin(object):
         pykka.ActorRegistry.stop_all()
 
     def stop_proxy(self, proxy):
-        return proxy.actor_ref.stop(timeout=self.TIMEOUT)
+        th = proxy.get_thread().get()
+        t = proxy.actor_ref.stop(timeout=self.TIMEOUT)
+        th.join()
+        return t
 
     def wait_for_assignment(self, proxy, attr_name, unassigned=None,
                             timeout=TIMEOUT):
@@ -136,11 +139,13 @@ class ActorTestMixin(object):
             if result is not unassigned:
                 return result
 
-    def busywait(self, f):
+    def busywait(self, f, finalize=None):
         n = 0
-        while not f() and n < 10:
+        while not f() and n < 20:
             time.sleep(.1)
             n += 1
+        if finalize is not None:
+            finalize()
         self.assertTrue(f())
 
 
index edeb647e4628e675be696cb68f4b61892b4cc606..cfb828b2a5d84c6d16407866374e1f4900185f84 100644 (file)
@@ -248,7 +248,8 @@ func (ps *pgEventSource) DB() *sql.DB {
 }
 
 func (ps *pgEventSource) DBHealth() error {
-       ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       defer cancel()
        var i int
        return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
 }
index f9f7f53edc58430f231e9a52d5d95bb1a025084a..d527c39ba1c4eeb12c0cbae63526150da27f096d 100644 (file)
@@ -60,6 +60,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // Receive websocket frames from the client and pass them to
        // sess.Receive().
        go func() {
+               defer cancel()
                buf := make([]byte, 2<<20)
                for {
                        select {
@@ -75,16 +76,14 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                err = errFrameTooBig
                        }
                        if err != nil {
-                               if err != io.EOF {
+                               if err != io.EOF && ctx.Err() == nil {
                                        log.WithError(err).Info("read error")
                                }
-                               cancel()
                                return
                        }
                        err = sess.Receive(buf)
                        if err != nil {
                                log.WithError(err).Error("sess.Receive() failed")
-                               cancel()
                                return
                        }
                }
@@ -94,6 +93,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // sess.EventMessage() as needed, and send them to the client
        // as websocket frames.
        go func() {
+               defer cancel()
                for {
                        var ok bool
                        var data interface{}
@@ -119,8 +119,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                buf, err = sess.EventMessage(e)
                                if err != nil {
                                        log.WithError(err).Error("EventMessage failed")
-                                       cancel()
-                                       break
+                                       return
                                } else if len(buf) == 0 {
                                        log.Debug("skip")
                                        continue
@@ -135,9 +134,10 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                        t0 := time.Now()
                        _, err = ws.Write(buf)
                        if err != nil {
-                               log.WithError(err).Error("write failed")
-                               cancel()
-                               break
+                               if ctx.Err() == nil {
+                                       log.WithError(err).Error("write failed")
+                               }
+                               return
                        }
                        log.Debug("sent")
 
@@ -159,6 +159,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // is done/cancelled or the incoming event stream ends. Shut
        // down the handler if the outgoing queue fills up.
        go func() {
+               defer cancel()
                ticker := time.NewTicker(h.PingTimeout)
                defer ticker.Stop()
 
@@ -178,10 +179,8 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                        default:
                                        }
                                }
-                               continue
                        case e, ok := <-incoming.Channel():
                                if !ok {
-                                       cancel()
                                        return
                                }
                                if !sess.Filter(e) {
@@ -191,7 +190,6 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                case queue <- e:
                                default:
                                        log.WithError(errQueueFull).Error("terminate")
-                                       cancel()
                                        return
                                }
                        }
index 58c64231cb53c1204ceed70b0ea030a7050ebb95..b57abb5cac31b0d563528669514e3bdcee151327 100644 (file)
@@ -8,6 +8,7 @@ import (
        "database/sql"
        "encoding/json"
        "errors"
+       "reflect"
        "sync"
        "sync/atomic"
        "time"
@@ -86,6 +87,24 @@ func (sess *v0session) Receive(buf []byte) error {
                sess.mtx.Unlock()
                sub.sendOldEvents(sess)
                return nil
+       } else if sub.Method == "unsubscribe" {
+               sess.mtx.Lock()
+               found := false
+               for i, s := range sess.subscriptions {
+                       if !reflect.DeepEqual(s.Filters, sub.Filters) {
+                               continue
+                       }
+                       copy(sess.subscriptions[i:], sess.subscriptions[i+1:])
+                       sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1]
+                       found = true
+                       break
+               }
+               sess.mtx.Unlock()
+               sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe")
+               if found {
+                       sess.sendq <- v0subscribeOK
+                       return nil
+               }
        } else {
                sess.log.WithField("Method", sub.Method).Info("unknown method")
        }
@@ -205,6 +224,10 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        // client will probably reconnect and do the
                        // same thing all over again.
                        time.Sleep(100 * time.Millisecond)
+                       if sess.ws.Request().Context().Err() != nil {
+                               // Session terminated while we were sleeping
+                               return
+                       }
                }
                now := time.Now()
                e := &event{
index 9f743e0b5e3d58312d2b3a2636b148bd493b51e0..7585bc5e17e017dc095a141d550e4e609c877c94 100644 (file)
@@ -71,15 +71,28 @@ func (s *v0Suite) TestFilters(c *check.C) {
        conn, r, w := s.testClient()
        defer conn.Close()
 
-       c.Check(w.Encode(map[string]interface{}{
-               "method":  "subscribe",
-               "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
-       }), check.IsNil)
-       s.expectStatus(c, r, 200)
+       cmd := func(method, eventType string, status int) {
+               c.Check(w.Encode(map[string]interface{}{
+                       "method":  method,
+                       "filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
+               }), check.IsNil)
+               s.expectStatus(c, r, status)
+       }
+       cmd("subscribe", "update", 200)
+       cmd("subscribe", "update", 200)
+       cmd("subscribe", "create", 200)
+       cmd("subscribe", "update", 200)
+       cmd("unsubscribe", "blip", 400)
+       cmd("unsubscribe", "create", 200)
+       cmd("unsubscribe", "update", 200)
 
        go s.emitEvents(nil)
        lg := s.expectLog(c, r)
        c.Check(lg.EventType, check.Equals, "update")
+
+       cmd("unsubscribe", "update", 200)
+       cmd("unsubscribe", "update", 200)
+       cmd("unsubscribe", "update", 400)
 }
 
 func (s *v0Suite) TestLastLogID(c *check.C) {
index b21e49e3539a3f8af1cbfe4a0d8ff0709c96f1fc..db9b64887fe325fd25fed9d452acf449524f9a1e 100644 (file)
@@ -65,14 +65,20 @@ RUN cd /root && \
     GOPATH=$PWD go get github.com/curoverse/runsvinit && \
     install bin/runsvinit /usr/local/bin
 
+ENV PJSVERSION=1.9.7
+
 RUN set -e && \
- PJS=phantomjs-1.9.7-linux-x86_64 && \
- curl -L -o/tmp/$PJS.tar.bz2 http://cache.arvados.org/$PJS.tar.bz2 && \
- tar -C /usr/local -xjf /tmp/$PJS.tar.bz2 && \
- ln -s ../$PJS/bin/phantomjs /usr/local/bin/
+ curl -L -f http://cache.arvados.org/phantomjs-${PJSVERSION}-linux-x86_64.tar.bz2 | tar -C /usr/local -xjf - && \
+ ln -s ../phantomjs-${PJSVERSION}-linux-x86_64/bin/phantomjs /usr/local/bin
 
 RUN pip install -U setuptools
 
+ENV NODEVERSION v6.11.2
+
+# Install nodejs binary
+RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
+    ln -s ../node-${NODEVERSION}-linux-x64/bin/node ../node-${NODEVERSION}-linux-x64/bin/npm /usr/local/bin
+
 ARG arvados_version
 RUN echo arvados_version is git commit $arvados_version
 
index 00fa7022ba1c1f595c67f1fc07b9095c4894d450..a70e4b2d0354ab9adf82097f9bd17e0a55c8e8de 100644 (file)
@@ -16,9 +16,11 @@ class ArgumentParser(argparse.ArgumentParser):
             description='Summarize resource usage of an Arvados Crunch job')
         src = self.add_mutually_exclusive_group()
         src.add_argument(
-            '--job', type=str, metavar='UUID',
-            help='Look up the specified job and read its log data from Keep'
-            ' (or from the Arvados event log, if the job is still running)')
+            '--job', '--container', '--container-request',
+            type=str, metavar='UUID',
+            help='Look up the specified job, container, or container request '
+            'and read its log data from Keep (or from the Arvados event log, '
+            'if the job is still running)')
         src.add_argument(
             '--pipeline-instance', type=str, metavar='UUID',
             help='Summarize each component of the given pipeline instance')
@@ -31,6 +33,9 @@ class ArgumentParser(argparse.ArgumentParser):
         self.add_argument(
             '--format', type=str, choices=('html', 'text'), default='text',
             help='Report format')
+        self.add_argument(
+            '--threads', type=int, default=8,
+            help='Maximum worker threads to run')
         self.add_argument(
             '--verbose', '-v', action='count', default=0,
             help='Log more information (once for progress, twice for debug)')
@@ -44,11 +49,12 @@ class Command(object):
     def run(self):
         kwargs = {
             'skip_child_jobs': self.args.skip_child_jobs,
+            'threads': self.args.threads,
         }
         if self.args.pipeline_instance:
-            self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance, **kwargs)
+            self.summer = summarizer.NewSummarizer(self.args.pipeline_instance, **kwargs)
         elif self.args.job:
-            self.summer = summarizer.JobSummarizer(self.args.job, **kwargs)
+            self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
         elif self.args.log_file:
             if self.args.log_file.endswith('.gz'):
                 fh = gzip.open(self.args.log_file)
index eeb12f09e4283c19885bfbf264dfe180b9bf2a06..a0a838d6c9538c6231e4d284b6a2d2e5b7df77d6 100644 (file)
@@ -17,12 +17,12 @@ class CollectionReader(object):
         logger.debug('load collection %s', collection_id)
         collection = arvados.collection.CollectionReader(collection_id)
         filenames = [filename for filename in collection]
-        if len(filenames) != 1:
-            raise ValueError(
-                "collection {} has {} files; need exactly one".format(
-                    collection_id, len(filenames)))
-        self._reader = collection.open(filenames[0])
-        self._label = "{}/{}".format(collection_id, filenames[0])
+        if len(filenames) == 1:
+            filename = filenames[0]
+        else:
+            filename = 'crunchstat.txt'
+        self._reader = collection.open(filename)
+        self._label = "{}/{}".format(collection_id, filename)
 
     def __str__(self):
         return self._label
index d51c6e34e0d4c8036ac9a9c13d9b4d2a66018089..33b92305306fc969be5f2016555fd7e1c257b6f4 100644 (file)
@@ -41,9 +41,10 @@ class Task(object):
 
 
 class Summarizer(object):
-    def __init__(self, logdata, label=None, skip_child_jobs=False):
+    def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
         self._logdata = logdata
 
+        self.uuid = uuid
         self.label = label
         self.starttime = None
         self.finishtime = None
@@ -69,122 +70,144 @@ class Summarizer(object):
 
     def run(self):
         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
+        self.detected_crunch1 = False
         for line in self._logdata:
-            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
-            if m:
-                seq = int(m.group('seq'))
-                uuid = m.group('task_uuid')
-                self.seq_to_uuid[seq] = uuid
-                logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
-                continue
-
-            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
-            if m:
-                task_id = self.seq_to_uuid[int(m.group('seq'))]
-                elapsed = int(m.group('elapsed'))
-                self.task_stats[task_id]['time'] = {'elapsed': elapsed}
-                if elapsed > self.stats_max['time']['elapsed']:
-                    self.stats_max['time']['elapsed'] = elapsed
-                continue
-
-            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
-            if m:
-                uuid = m.group('uuid')
-                if self._skip_child_jobs:
-                    logger.warning('%s: omitting stats from child job %s'
-                                   ' because --skip-child-jobs flag is on',
-                                   self.label, uuid)
+            if not self.detected_crunch1 and '-8i9sb-' in line:
+                self.detected_crunch1 = True
+
+            if self.detected_crunch1:
+                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
+                if m:
+                    seq = int(m.group('seq'))
+                    uuid = m.group('task_uuid')
+                    self.seq_to_uuid[seq] = uuid
+                    logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
                     continue
-                logger.debug('%s: follow %s', self.label, uuid)
-                child_summarizer = JobSummarizer(uuid)
-                child_summarizer.stats_max = self.stats_max
-                child_summarizer.task_stats = self.task_stats
-                child_summarizer.tasks = self.tasks
-                child_summarizer.starttime = self.starttime
-                child_summarizer.run()
-                logger.debug('%s: done %s', self.label, uuid)
-                continue
 
-            m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
-            if not m:
-                continue
+                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
+                if m:
+                    task_id = self.seq_to_uuid[int(m.group('seq'))]
+                    elapsed = int(m.group('elapsed'))
+                    self.task_stats[task_id]['time'] = {'elapsed': elapsed}
+                    if elapsed > self.stats_max['time']['elapsed']:
+                        self.stats_max['time']['elapsed'] = elapsed
+                    continue
 
-            try:
-                if self.label is None:
-                    self.label = m.group('job_uuid')
-                    logger.debug('%s: using job uuid as label', self.label)
-                if m.group('category').endswith(':'):
-                    # "stderr crunchstat: notice: ..."
+                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+                if m:
+                    uuid = m.group('uuid')
+                    if self._skip_child_jobs:
+                        logger.warning('%s: omitting stats from child job %s'
+                                       ' because --skip-child-jobs flag is on',
+                                       self.label, uuid)
+                        continue
+                    logger.debug('%s: follow %s', self.label, uuid)
+                    child_summarizer = ProcessSummarizer(uuid)
+                    child_summarizer.stats_max = self.stats_max
+                    child_summarizer.task_stats = self.task_stats
+                    child_summarizer.tasks = self.tasks
+                    child_summarizer.starttime = self.starttime
+                    child_summarizer.run()
+                    logger.debug('%s: done %s', self.label, uuid)
                     continue
-                elif m.group('category') in ('error', 'caught'):
+
+                m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+                if not m:
                     continue
-                elif m.group('category') == 'read':
-                    # "stderr crunchstat: read /proc/1234/net/dev: ..."
-                    # (crunchstat formatting fixed, but old logs still say this)
+            else:
+                # crunch2
+                m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+                if not m:
                     continue
+
+            if self.label is None:
+                try:
+                    self.label = m.group('job_uuid')
+                except IndexError:
+                    self.label = 'container'
+            if m.group('category').endswith(':'):
+                # "stderr crunchstat: notice: ..."
+                continue
+            elif m.group('category') in ('error', 'caught'):
+                continue
+            elif m.group('category') in ['read', 'open', 'cgroup', 'CID']:
+                # "stderr crunchstat: read /proc/1234/net/dev: ..."
+                # (old logs are less careful with unprefixed error messages)
+                continue
+
+            if self.detected_crunch1:
                 task_id = self.seq_to_uuid[int(m.group('seq'))]
-                task = self.tasks[task_id]
+            else:
+                task_id = 'container'
+            task = self.tasks[task_id]
 
-                # Use the first and last crunchstat timestamps as
-                # approximations of starttime and finishtime.
+            # Use the first and last crunchstat timestamps as
+            # approximations of starttime and finishtime.
+            timestamp = m.group('timestamp')
+            if timestamp[10:11] == '_':
                 timestamp = datetime.datetime.strptime(
-                    m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
-                if not task.starttime:
-                    task.starttime = timestamp
-                    logger.debug('%s: task %s starttime %s',
-                                 self.label, task_id, timestamp)
-                task.finishtime = timestamp
-
-                if not self.starttime:
-                    self.starttime = timestamp
-                self.finishtime = timestamp
-
-                this_interval_s = None
-                for group in ['current', 'interval']:
-                    if not m.group(group):
-                        continue
-                    category = m.group('category')
-                    words = m.group(group).split(' ')
-                    stats = {}
+                    timestamp, '%Y-%m-%d_%H:%M:%S')
+            elif timestamp[10:11] == 'T':
+                timestamp = datetime.datetime.strptime(
+                    timestamp[:19], '%Y-%m-%dT%H:%M:%S')
+            else:
+                raise ValueError("Cannot parse timestamp {!r}".format(
+                    timestamp))
+
+            if not task.starttime:
+                task.starttime = timestamp
+                logger.debug('%s: task %s starttime %s',
+                             self.label, task_id, timestamp)
+            task.finishtime = timestamp
+
+            if not self.starttime:
+                self.starttime = timestamp
+            self.finishtime = timestamp
+
+            this_interval_s = None
+            for group in ['current', 'interval']:
+                if not m.group(group):
+                    continue
+                category = m.group('category')
+                words = m.group(group).split(' ')
+                stats = {}
+                try:
                     for val, stat in zip(words[::2], words[1::2]):
-                        try:
-                            if '.' in val:
-                                stats[stat] = float(val)
-                            else:
-                                stats[stat] = int(val)
-                        except ValueError as e:
-                            raise ValueError(
-                                'Error parsing {} stat: {!r}'.format(
-                                    stat, e))
-                    if 'user' in stats or 'sys' in stats:
-                        stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
-                    if 'tx' in stats or 'rx' in stats:
-                        stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
-                    for stat, val in stats.iteritems():
-                        if group == 'interval':
-                            if stat == 'seconds':
-                                this_interval_s = val
-                                continue
-                            elif not (this_interval_s > 0):
-                                logger.error(
-                                    "BUG? interval stat given with duration {!r}".
-                                    format(this_interval_s))
-                                continue
-                            else:
-                                stat = stat + '__rate'
-                                val = val / this_interval_s
-                                if stat in ['user+sys__rate', 'tx+rx__rate']:
-                                    task.series[category, stat].append(
-                                        (timestamp - self.starttime, val))
+                        if '.' in val:
+                            stats[stat] = float(val)
+                        else:
+                            stats[stat] = int(val)
+                except ValueError as e:
+                    logger.warning('Error parsing {} stat: {!r}'.format(
+                        stat, e))
+                    continue
+                if 'user' in stats or 'sys' in stats:
+                    stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
+                if 'tx' in stats or 'rx' in stats:
+                    stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
+                for stat, val in stats.iteritems():
+                    if group == 'interval':
+                        if stat == 'seconds':
+                            this_interval_s = val
+                            continue
+                        elif not (this_interval_s > 0):
+                            logger.error(
+                                "BUG? interval stat given with duration {!r}".
+                                format(this_interval_s))
+                            continue
                         else:
-                            if stat in ['rss']:
+                            stat = stat + '__rate'
+                            val = val / this_interval_s
+                            if stat in ['user+sys__rate', 'tx+rx__rate']:
                                 task.series[category, stat].append(
                                     (timestamp - self.starttime, val))
-                            self.task_stats[task_id][category][stat] = val
-                        if val > self.stats_max[category][stat]:
-                            self.stats_max[category][stat] = val
-            except Exception as e:
-                logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
+                    else:
+                        if stat in ['rss']:
+                            task.series[category, stat].append(
+                                (timestamp - self.starttime, val))
+                        self.task_stats[task_id][category][stat] = val
+                    if val > self.stats_max[category][stat]:
+                        self.stats_max[category][stat] = val
         logger.debug('%s: done parsing', self.label)
 
         self.job_tot = collections.defaultdict(
@@ -284,19 +307,21 @@ class Summarizer(object):
     def _recommend_cpu(self):
         """Recommend asking for 4 cores if max CPU usage was 333%"""
 
+        constraint_key = self._map_runtime_constraint('vcpus')
         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
         if cpu_max_rate == float('-Inf'):
             logger.warning('%s: no CPU usage data', self.label)
             return
         used_cores = max(1, int(math.ceil(cpu_max_rate)))
-        asked_cores = self.existing_constraints.get('min_cores_per_node')
+        asked_cores = self.existing_constraints.get(constraint_key)
         if asked_cores is None or used_cores < asked_cores:
             yield (
                 '#!! {} max CPU usage was {}% -- '
-                'try runtime_constraints "min_cores_per_node":{}'
+                'try runtime_constraints "{}":{}'
             ).format(
                 self.label,
                 int(math.ceil(cpu_max_rate*100)),
+                constraint_key,
                 int(used_cores))
 
     def _recommend_ram(self):
@@ -333,40 +358,44 @@ class Summarizer(object):
         the memory we want -- even if that happens to be 8192 MiB.
         """
 
+        constraint_key = self._map_runtime_constraint('ram')
         used_bytes = self.stats_max['mem']['rss']
         if used_bytes == float('-Inf'):
             logger.warning('%s: no memory usage data', self.label)
             return
         used_mib = math.ceil(float(used_bytes) / 1048576)
-        asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
+        asked_mib = self.existing_constraints.get(constraint_key)
 
         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
         if asked_mib is None or (
                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
             yield (
                 '#!! {} max RSS was {} MiB -- '
-                'try runtime_constraints "min_ram_mb_per_node":{}'
+                'try runtime_constraints "{}":{}'
             ).format(
                 self.label,
                 int(used_mib),
-                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
+                constraint_key,
+                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
 
     def _recommend_keep_cache(self):
         """Recommend increasing keep cache if utilization < 80%"""
+        constraint_key = self._map_runtime_constraint('keep_cache_ram')
         if self.job_tot['net:keep0']['rx'] == 0:
             return
         utilization = (float(self.job_tot['blkio:0:0']['read']) /
                        float(self.job_tot['net:keep0']['rx']))
-        asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
+        asked_mib = self.existing_constraints.get(constraint_key, 256)
 
         if utilization < 0.8:
             yield (
                 '#!! {} Keep cache utilization was {:.2f}% -- '
-                'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
+                'try runtime_constraints "{}":{} (or more)'
             ).format(
                 self.label,
                 utilization * 100.0,
-                asked_mib*2)
+                constraint_key,
+                asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
 
 
     def _format(self, val):
@@ -378,6 +407,22 @@ class Summarizer(object):
         else:
             return '{}'.format(val)
 
+    def _runtime_constraint_mem_unit(self):
+        if hasattr(self, 'runtime_constraint_mem_unit'):
+            return self.runtime_constraint_mem_unit
+        elif self.detected_crunch1:
+            return JobSummarizer.runtime_constraint_mem_unit
+        else:
+            return ContainerSummarizer.runtime_constraint_mem_unit
+
+    def _map_runtime_constraint(self, key):
+        if hasattr(self, 'map_runtime_constraint'):
+            return self.map_runtime_constraint[key]
+        elif self.detected_crunch1:
+            return JobSummarizer.map_runtime_constraint[key]
+        else:
+            return key
+
 
 class CollectionSummarizer(Summarizer):
     def __init__(self, collection_id, **kwargs):
@@ -386,53 +431,91 @@ class CollectionSummarizer(Summarizer):
         self.label = collection_id
 
 
-class JobSummarizer(Summarizer):
-    def __init__(self, job, **kwargs):
-        arv = arvados.api('v1')
-        if isinstance(job, basestring):
-            self.job = arv.jobs().get(uuid=job).execute()
-        else:
-            self.job = job
+def NewSummarizer(process_or_uuid, **kwargs):
+    """Construct with the appropriate subclass for this uuid/object."""
+
+    if isinstance(process_or_uuid, dict):
+        process = process_or_uuid
+        uuid = process['uuid']
+    else:
+        uuid = process_or_uuid
+        process = None
+        arv = arvados.api('v1', model=OrderedJsonModel())
+
+    if '-dz642-' in uuid:
+        if process is None:
+            process = arv.containers().get(uuid=uuid).execute()
+        klass = ContainerTreeSummarizer
+    elif '-xvhdp-' in uuid:
+        if process is None:
+            process = arv.container_requests().get(uuid=uuid).execute()
+        klass = ContainerTreeSummarizer
+    elif '-8i9sb-' in uuid:
+        if process is None:
+            process = arv.jobs().get(uuid=uuid).execute()
+        klass = JobSummarizer
+    elif '-d1hrv-' in uuid:
+        if process is None:
+            process = arv.pipeline_instances().get(uuid=uuid).execute()
+        klass = PipelineSummarizer
+    elif '-4zz18-' in uuid:
+        return CollectionSummarizer(collection_id=uuid)
+    else:
+        raise ArgumentError("Unrecognized uuid %s", uuid)
+    return klass(process, uuid=uuid, **kwargs)
+
+
+class ProcessSummarizer(Summarizer):
+    """Process is a job, pipeline, container, or container request."""
+
+    def __init__(self, process, label=None, **kwargs):
         rdr = None
-        if self.job.get('log'):
+        self.process = process
+        if label is None:
+            label = self.process.get('name', self.process['uuid'])
+        if self.process.get('log'):
             try:
-                rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
+                rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
             except arvados.errors.NotFoundError as e:
                 logger.warning("Trying event logs after failing to read "
-                               "log collection %s: %s", self.job['log'], e)
-            else:
-                label = self.job['uuid']
+                               "log collection %s: %s", self.process['log'], e)
         if rdr is None:
-            rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
-            label = self.job['uuid'] + ' (partial)'
-        super(JobSummarizer, self).__init__(rdr, **kwargs)
-        self.label = label
-        self.existing_constraints = self.job.get('runtime_constraints', {})
+            rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
+            label = label + ' (partial)'
+        super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
+        self.existing_constraints = self.process.get('runtime_constraints', {})
 
 
-class PipelineSummarizer(object):
-    def __init__(self, pipeline_instance_uuid, **kwargs):
-        arv = arvados.api('v1', model=OrderedJsonModel())
-        instance = arv.pipeline_instances().get(
-            uuid=pipeline_instance_uuid).execute()
-        self.summarizers = collections.OrderedDict()
-        for cname, component in instance['components'].iteritems():
-            if 'job' not in component:
-                logger.warning(
-                    "%s: skipping component with no job assigned", cname)
-            else:
-                logger.info(
-                    "%s: job %s", cname, component['job']['uuid'])
-                summarizer = JobSummarizer(component['job'], **kwargs)
-                summarizer.label = '{} {}'.format(
-                    cname, component['job']['uuid'])
-                self.summarizers[cname] = summarizer
-        self.label = pipeline_instance_uuid
+class JobSummarizer(ProcessSummarizer):
+    runtime_constraint_mem_unit = 1048576
+    map_runtime_constraint = {
+        'keep_cache_ram': 'keep_cache_mb_per_task',
+        'ram': 'min_ram_mb_per_node',
+        'vcpus': 'min_cores_per_node',
+    }
+
+
+class ContainerSummarizer(ProcessSummarizer):
+    runtime_constraint_mem_unit = 1
+
+
+class MultiSummarizer(object):
+    def __init__(self, children={}, label=None, threads=1, **kwargs):
+        self.throttle = threading.Semaphore(threads)
+        self.children = children
+        self.label = label
+
+    def run_and_release(self, target, *args, **kwargs):
+        try:
+            return target(*args, **kwargs)
+        finally:
+            self.throttle.release()
 
     def run(self):
         threads = []
-        for summarizer in self.summarizers.itervalues():
-            t = threading.Thread(target=summarizer.run)
+        for child in self.children.itervalues():
+            self.throttle.acquire()
+            t = threading.Thread(target=self.run_and_release, args=(child.run, ))
             t.daemon = True
             t.start()
             threads.append(t)
@@ -441,12 +524,77 @@ class PipelineSummarizer(object):
 
     def text_report(self):
         txt = ''
-        for cname, summarizer in self.summarizers.iteritems():
-            txt += '### Summary for {} ({})\n'.format(
-                cname, summarizer.job['uuid'])
-            txt += summarizer.text_report()
+        for cname, child in self.children.iteritems():
+            if len(self.children) > 1:
+                txt += '### Summary for {} ({})\n'.format(
+                    cname, child.process['uuid'])
+            txt += child.text_report()
             txt += '\n'
         return txt
 
     def html_report(self):
-        return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html()
+        return WEBCHART_CLASS(self.label, self.children.itervalues()).html()
+
+
+class PipelineSummarizer(MultiSummarizer):
+    def __init__(self, instance, **kwargs):
+        children = collections.OrderedDict()
+        for cname, component in instance['components'].iteritems():
+            if 'job' not in component:
+                logger.warning(
+                    "%s: skipping component with no job assigned", cname)
+            else:
+                logger.info(
+                    "%s: job %s", cname, component['job']['uuid'])
+                summarizer = JobSummarizer(component['job'], **kwargs)
+                summarizer.label = '{} {}'.format(
+                    cname, component['job']['uuid'])
+                children[cname] = summarizer
+        super(PipelineSummarizer, self).__init__(
+            children=children,
+            label=instance['uuid'],
+            **kwargs)
+
+
+class ContainerTreeSummarizer(MultiSummarizer):
+    def __init__(self, root, **kwargs):
+        arv = arvados.api('v1', model=OrderedJsonModel())
+
+        label = kwargs.pop('label', None) or root.get('name') or root['uuid']
+        root['name'] = label
+
+        children = collections.OrderedDict()
+        todo = collections.deque((root, ))
+        while len(todo) > 0:
+            current = todo.popleft()
+            label = current['name']
+            sort_key = current['created_at']
+            if current['uuid'].find('-xvhdp-') > 0:
+                current = arv.containers().get(uuid=current['container_uuid']).execute()
+
+            summer = ContainerSummarizer(current, label=label, **kwargs)
+            summer.sort_key = sort_key
+            children[current['uuid']] = summer
+
+            page_filters = []
+            while True:
+                items = arv.container_requests().index(
+                    order=['uuid asc'],
+                    filters=page_filters+[
+                        ['requesting_container_uuid', '=', current['uuid']]],
+                ).execute()['items']
+                if not items:
+                    break
+                page_filters = [['uuid', '>', items[-1]['uuid']]]
+                for cr in items:
+                    if cr['container_uuid']:
+                        logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
+                        cr['name'] = cr.get('name') or cr['uuid']
+                        todo.append(cr)
+        sorted_children = collections.OrderedDict()
+        for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
+            sorted_children[uuid] = children[uuid]
+        super(ContainerTreeSummarizer, self).__init__(
+            children=sorted_children,
+            label=root['name'],
+            **kwargs)
diff --git a/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz
new file mode 100644 (file)
index 0000000..8b069e7
Binary files /dev/null and b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz differ
diff --git a/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report
new file mode 100644 (file)
index 0000000..88e06a3
--- /dev/null
@@ -0,0 +1,23 @@
+category       metric  task_max        task_max_rate   job_total
+cpu    cpus    20      -       -
+cpu    sys     0.82    0.08    0.82
+cpu    user    2.31    0.22    2.31
+cpu    user+sys        3.13    0.30    3.13
+mem    cache   23846912        -       -
+mem    pgmajfault      121     -       121
+mem    rss     65470464        -       -
+mem    swap    0       -       -
+net:eth0       rx      500762  951.15  500762
+net:eth0       tx      36242   226.61  36242
+net:eth0       tx+rx   537004  1177.76 537004
+# Number of tasks: 1
+# Max CPU time spent by a single task: 3.13s
+# Max CPU usage in a single interval: 29.89%
+# Overall CPU usage: 0%
+# Max memory used by a single task: 0.07GB
+# Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! container max CPU usage was 30% -- try runtime_constraints "vcpus":1
+#!! container max RSS was 63 MiB -- try runtime_constraints "ram":1020054732
index d060becec3dddd7caa852fb2465d3284bb2cd6b8..f5fde5fdc4ebe864e96a092070541dce6f247ea6 100644 (file)
@@ -59,6 +59,38 @@ class SummarizeEdgeCases(unittest.TestCase):
         s.run()
 
 
+class SummarizeContainer(ReportDiff):
+    fake_container = {
+        'uuid': '9tee4-dz642-mjfb0i5hzojp16a',
+        'created_at': '2017-08-18T14:27:25.371388141',
+        'log': '9tee4-4zz18-ihyzym9tcwjwg4r',
+    }
+    fake_request = {
+        'uuid': '9tee4-xvhdp-uper95jktm10d3w',
+        'name': 'container',
+        'created_at': '2017-08-18T14:27:25.242339223Z',
+        'container_uuid': fake_container['uuid'],
+    }
+    logfile = os.path.join(
+        TESTS_DIR, 'container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz')
+
+    @mock.patch('arvados.collection.CollectionReader')
+    @mock.patch('arvados.api')
+    def test_container(self, mock_api, mock_cr):
+        mock_api().container_requests().index().execute.return_value = {'items':[]}
+        mock_api().container_requests().get().execute.return_value = self.fake_request
+        mock_api().containers().get().execute.return_value = self.fake_container
+        mock_cr().__iter__.return_value = [
+            'crunch-run.txt', 'stderr.txt', 'node-info.txt',
+            'container.json', 'crunchstat.txt']
+        mock_cr().open.return_value = gzip.open(self.logfile)
+        args = crunchstat_summary.command.ArgumentParser().parse_args(
+            ['--job', self.fake_request['uuid']])
+        cmd = crunchstat_summary.command.Command(args)
+        cmd.run()
+        self.diff_known_report(self.logfile, cmd)
+
+
 class SummarizeJob(ReportDiff):
     fake_job_uuid = '4xphq-8i9sb-jq0ekny1xou3zoh'
     fake_log_id = 'fake-log-collection-id'
index 958c81d4df8f408ade30f5047c2996af35e8c271..f9c46e7325aed6358e1cc8296d0a364fea27a908 100644 (file)
@@ -3,22 +3,25 @@
        "ignore": "test",
        "package": [
                {
-                       "checksumSHA1": "b68aaMZImS90FjnReAxpbp20FGA=",
+                       "checksumSHA1": "jf7K+UTQNIzRdlG5F4zX/8b++/E=",
                        "origin": "github.com/curoverse/goamz/aws",
                        "path": "github.com/AdRoll/goamz/aws",
-                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
+                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
-                       "checksumSHA1": "ey9ddXTW9dncjJz/COKpeYm+sgg=",
+                       "checksumSHA1": "9nUwQXI+pNxZo6bnR7NslpMpfPI=",
                        "origin": "github.com/curoverse/goamz/s3",
                        "path": "github.com/AdRoll/goamz/s3",
-                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
+                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
-                       "checksumSHA1": "pDHYVqUQtRsPYw/X4kUrdK7pxMs=",
+                       "checksumSHA1": "tvxbsTkdjB0C/uxEglqD6JfVnMg=",
                        "origin": "github.com/curoverse/goamz/s3/s3test",
                        "path": "github.com/AdRoll/goamz/s3/s3test",
-                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
+                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
                        "checksumSHA1": "Rjy2uYZkQ8Kjht6ZFU0qzm2I/kI=",
                        "revisionTime": "2017-03-24T20:46:54Z"
                },
                {
-                       "checksumSHA1": "Gk3jTNQ5uGDUE0WMJFWcYz9PMps=",
+                       "checksumSHA1": "q5SZBWFVC3wOIzftf+l/h5WLG1k=",
                        "path": "github.com/lib/pq/oid",
                        "revision": "2704adc878c21e1329f46f6e56a1c387d788ff94",
                        "revisionTime": "2017-03-24T20:46:54Z"
                        "revisionTime": "2017-05-12T22:20:15Z"
                },
                {
-                       "checksumSHA1": "ENl6I8+3AaBanbn9CVExMjDTHPc=",
+                       "checksumSHA1": "dUfdXzRJupI9VpqNR2LlppeZvLc=",
                        "origin": "github.com/docker/docker/vendor/golang.org/x/sys/unix",
                        "path": "golang.org/x/sys/unix",
                        "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",