services/api/config/arvados-clients.yml
*#*
.DS_Store
+.vscode
end
def show_file_links
- if Rails.configuration.keep_web_url || Rails.configuration.keep_web_download_url
- # show_file will redirect to keep-web's directory listing
- return show_file
- end
- Thread.current[:reader_tokens] = [params[:reader_token]]
- return if false.equal?(find_object_by_uuid)
- render layout: false
+ return show_file
end
def show_file
- # We pipe from arv-get to send the file to the user. Before we start it,
- # we ask the API server if the file actually exists. This serves two
- # purposes: it lets us return a useful status code for common errors, and
- # helps us figure out which token to provide to arv-get.
# The order of searched tokens is important: because the anonymous user
# token is passed along with every API request, we have to check it first.
# Otherwise, it's impossible to know whether any other request succeeded
return
end
- # If we are configured to use a keep-web server, just redirect to
- # the appropriate URL.
- if Rails.configuration.keep_web_url or
- Rails.configuration.keep_web_download_url
- opts = {}
- if usable_token == params[:reader_token]
- opts[:path_token] = usable_token
- elsif usable_token == Rails.configuration.anonymous_user_token
- # Don't pass a token at all
- else
- # We pass the current user's real token only if it's necessary
- # to read the collection.
- opts[:query_token] = usable_token
- end
- opts[:disposition] = params[:disposition] if params[:disposition]
- return redirect_to keep_web_url(params[:uuid], params[:file], opts)
- end
-
- # No keep-web server available. Get the file data with arv-get,
- # and serve it through Rails.
-
- file_name = params[:file].andand.sub(/^(\.\/|\/|)/, './')
- if file_name.nil? or not coll.manifest.has_file?(file_name)
- return render_not_found
- end
-
- opts = params.merge(arvados_api_token: usable_token)
-
- # Handle Range requests. Currently we support only 'bytes=0-....'
- if request.headers.include? 'HTTP_RANGE'
- if m = /^bytes=0-(\d+)/.match(request.headers['HTTP_RANGE'])
- opts[:maxbytes] = m[1]
- size = params[:size] || '*'
- self.response.status = 206
- self.response.headers['Content-Range'] = "bytes 0-#{m[1]}/#{size}"
- end
- end
-
- ext = File.extname(params[:file])
- self.response.headers['Content-Type'] =
- Rack::Mime::MIME_TYPES[ext] || 'application/octet-stream'
- if params[:size]
- size = params[:size].to_i
- if opts[:maxbytes]
- size = [size, opts[:maxbytes].to_i].min
- end
- self.response.headers['Content-Length'] = size.to_s
- end
- self.response.headers['Content-Disposition'] = params[:disposition] if params[:disposition]
- begin
- file_enumerator(opts).each do |bytes|
- response.stream.write bytes
- end
- ensure
- response.stream.close
+ opts = {}
+ if usable_token == params[:reader_token]
+ opts[:path_token] = usable_token
+ elsif usable_token == Rails.configuration.anonymous_user_token
+ # Don't pass a token at all
+ else
+ # We pass the current user's real token only if it's necessary
+ # to read the collection.
+ opts[:query_token] = usable_token
end
+ opts[:disposition] = params[:disposition] if params[:disposition]
+ return redirect_to keep_web_url(params[:uuid], params[:file], opts)
end
def sharing_scopes
def download_link
token = @search_sharing.first.api_token
- if Rails.configuration.keep_web_url || Rails.configuration.keep_web_download_url
- keep_web_url(@object.uuid, nil, {path_token: token})
- else
- collections_url + "/download/#{@object.uuid}/#{token}/"
- end
+ keep_web_url(@object.uuid, nil, {path_token: token})
end
def share
uri.to_s
end
-
- # Note: several controller and integration tests rely on stubbing
- # file_enumerator to return fake file content.
- def file_enumerator opts
- FileStreamer.new opts
- end
-
- class FileStreamer
- include ArvadosApiClientHelper
- def initialize(opts={})
- @opts = opts
- end
- def each
- return unless @opts[:uuid] && @opts[:file]
-
- env = Hash[ENV].dup
-
- require 'uri'
- u = URI.parse(arvados_api_client.arvados_v1_base)
- env['ARVADOS_API_HOST'] = "#{u.host}:#{u.port}"
- env['ARVADOS_API_TOKEN'] = @opts[:arvados_api_token]
- env['ARVADOS_API_HOST_INSECURE'] = "true" if Rails.configuration.arvados_insecure_https
-
- bytesleft = @opts[:maxbytes].andand.to_i || 2**16
- io = IO.popen([env, 'arv-get', "#{@opts[:uuid]}/#{@opts[:file]}"], 'rb')
- while bytesleft > 0 && (buf = io.read([bytesleft, 2**16].min)) != nil
- # shrink the bytesleft count, if we were given a maximum byte
- # count to read
- if @opts.include? :maxbytes
- bytesleft = bytesleft - buf.length
- end
- yield buf
- end
- io.close
- # "If ios is opened by IO.popen, close sets $?."
- # http://www.ruby-doc.org/core-2.1.3/IO.html#method-i-close
- Rails.logger.warn("#{@opts[:uuid]}/#{@opts[:file]}: #{$?}") if $? != 0
- end
- end
end
before_filter :check_auth_header
def check_auth_header
- mgmt_token = Rails.configuration.management_token
+ mgmt_token = Rails.configuration.ManagementToken
auth_header = request.headers['Authorization']
if !mgmt_token
$("#log-viewer-download-pane").show();
var headers = {};
if (log_size > log_maxbytes) {
- headers['Range'] = 'bytes=0-' + log_maxbytes;
+ headers['Range'] = 'bytes=0-' + (log_maxbytes - 1);
}
var ajax_opts = { dataType: 'text', headers: headers };
load_log();
profiling_enabled: true
secret_token: <%= rand(2**256).to_s(36) %>
secret_key_base: <%= rand(2**256).to_s(36) %>
+ # This setting is to allow workbench start when running tests, it should be
+ # set to a correct value when testing relevant features.
+ keep_web_url: http://example.com/c=%{uuid_or_pdh}
# When you run the Workbench's integration tests, it starts the API
# server as a dependency. These settings should match the API
shell_in_a_box_url: false
# Format of preview links. If false, use keep_web_download_url
- # instead, and disable inline preview. If both are false, use
- # Workbench's built-in file download/preview mechanism.
+ # instead, and disable inline preview.
+ # If both are false, Workbench won't start, this is a mandatory configuration.
#
# Examples:
# keep_web_url: https://%{uuid_or_pdh}.collections.uuid_prefix.arvadosapi.com
# Token to be included in all healthcheck requests. Disabled by default.
# Workbench expects request header of the format "Authorization: Bearer xxx"
- management_token: false
+ ManagementToken: false
arvados_v1_base: https://arvados.local:3030/arvados/v1
arvados_insecure_https: true
+ # You need to configure at least one of these:
+ keep_web_url: false
+ keep_web_download_url: false
+
production:
# At minimum, you need a nice long randomly generated secret_token here.
secret_token: ~
arvados_login_base: https://arvados.local:3030/login
arvados_v1_base: https://arvados.local:3030/arvados/v1
arvados_insecure_https: false
+
+ # You need to configure at least one of these:
+ keep_web_url: false
+ keep_web_download_url: false
cfg.send "#{k}=", v
end
end
- if !nils.empty?
+ if !nils.empty? and not ::Rails.groups.include?('assets')
raise <<EOS
+#{::Rails.groups.include?('assets')}
Refusing to start in #{::Rails.env.to_s} mode with missing configuration.
The following configuration settings must be specified in
config/application.yml:
* #{nils.join "\n* "}
+EOS
+ end
+ # Refuse to start if keep-web isn't configured
+ if not (config.keep_web_url or config.keep_web_download_url) and not ::Rails.groups.include?('assets')
+ raise <<EOS
+Refusing to start in #{::Rails.env.to_s} mode with missing configuration.
+
+Keep-web service must be configured in config/application.yml:
+* keep_web_url
+* keep_web_download_url
+
EOS
end
end
end
end
- def stub_file_content
- # For the duration of the current test case, stub file download
- # content with a randomized (but recognizable) string. Return the
- # string, the test case can use it in assertions.
- txt = 'the quick brown fox ' + rand(2**32).to_s
- @controller.stubs(:file_enumerator).returns([txt])
- txt
- end
-
def collection_params(collection_name, file_name=nil)
uuid = api_fixture('collections')[collection_name.to_s]['uuid']
params = {uuid: uuid, id: uuid}
end
test "download a file with spaces in filename" do
+ setup_for_keep_web
collection = api_fixture('collections')['w_a_z_file']
- fakepipe = IO.popen(['echo', '-n', 'w a z'], 'rb')
- IO.expects(:popen).with { |cmd, mode|
- cmd.include? "#{collection['uuid']}/w a z"
- }.returns(fakepipe)
get :show_file, {
uuid: collection['uuid'],
file: 'w a z'
}, session_for(:active)
- assert_response :success
- assert_equal 'w a z', response.body
+ assert_response :redirect
+ assert_match /w%20a%20z/, response.redirect_url
end
test "viewing a collection fetches related projects" do
params[:reader_token] = api_fixture("api_client_authorizations",
"active_all_collections", "api_token")
get(:show_file_links, params)
- assert_response :success
- assert_equal([['.', 'foo', 3]], assigns(:object).files)
+ assert_response :redirect
assert_no_session
end
test "fetching collection file with reader token" do
- expected = stub_file_content
+ setup_for_keep_web
params = collection_params(:foo_file, "foo")
params[:reader_token] = api_fixture("api_client_authorizations",
"active_all_collections", "api_token")
get(:show_file, params)
- assert_response :success
- assert_equal(expected, @response.body,
- "failed to fetch a Collection file with a reader token")
+ assert_response :redirect
+ assert_match /foo/, response.redirect_url
assert_no_session
end
end
test "getting a file from Keep" do
+ setup_for_keep_web
params = collection_params(:foo_file, 'foo')
sess = session_for(:active)
- expect_content = stub_file_content
get(:show_file, params, sess)
- assert_response :success
- assert_equal(expect_content, @response.body,
- "failed to get a correct file from Keep")
+ assert_response :redirect
+ assert_match /foo/, response.redirect_url
end
test 'anonymous download' do
+ setup_for_keep_web
config_anonymous true
- expect_content = stub_file_content
get :show_file, {
uuid: api_fixture('collections')['user_agreement_in_anonymously_accessible_project']['uuid'],
file: 'GNU_General_Public_License,_version_3.pdf',
}
- assert_response :success
- assert_equal expect_content, response.body
+ assert_response :redirect
+ assert_match /GNU_General_Public_License/, response.redirect_url
end
test "can't get a file from Keep without permission" do
assert_response 404
end
- test "trying to get a nonexistent file from Keep returns a 404" do
- params = collection_params(:foo_file, 'gone')
- sess = session_for(:admin)
- get(:show_file, params, sess)
- assert_response 404
- end
-
test "getting a file from Keep with a good reader token" do
+ setup_for_keep_web
params = collection_params(:foo_file, 'foo')
read_token = api_fixture('api_client_authorizations')['active']['api_token']
params[:reader_token] = read_token
- expect_content = stub_file_content
get(:show_file, params)
- assert_response :success
- assert_equal(expect_content, @response.body,
- "failed to get a correct file from Keep using a reader token")
+ assert_response :redirect
+ assert_match /foo/, response.redirect_url
assert_not_equal(read_token, session[:arvados_api_token],
"using a reader token set the session's API token")
end
end
test "can get a file with an unpermissioned auth but in-scope reader token" do
+ setup_for_keep_web
params = collection_params(:foo_file, 'foo')
sess = session_for(:expired)
read_token = api_fixture('api_client_authorizations')['active']['api_token']
params[:reader_token] = read_token
- expect_content = stub_file_content
get(:show_file, params, sess)
- assert_response :success
- assert_equal(expect_content, @response.body,
- "failed to get a correct file from Keep using a reader token")
+ assert_response :redirect
assert_not_equal(read_token, session[:arvados_api_token],
"using a reader token set the session's API token")
end
test "inactive user can retrieve user agreement" do
+ setup_for_keep_web
ua_collection = api_fixture('collections')['user_agreement']
# Here we don't test whether the agreement can be retrieved from
- # Keep. We only test that show_file decides to send file content,
- # so we use the file content stub.
- stub_file_content
+ # Keep. We only test that show_file decides to send file content.
get :show_file, {
uuid: ua_collection['uuid'],
file: ua_collection['manifest_text'].match(/ \d+:\d+:(\S+)/)[1]
assert_nil(assigns(:unsigned_user_agreements),
"Did not skip check_user_agreements filter " +
"when showing the user agreement.")
- assert_response :success
+ assert_response :redirect
end
test "requesting nonexistent Collection returns 404" do
:active, 404)
end
- test "use a reasonable read buffer even if client requests a huge range" do
- fakefiledata = mock
- IO.expects(:popen).returns(fakefiledata)
- fakefiledata.expects(:read).twice.with() do |length|
- # Fail the test if read() is called with length>1MiB:
- length < 2**20
- ## Force the ActionController::Live thread to lose the race to
- ## verify that @response.body.length actually waits for the
- ## response (see below):
- # sleep 3
- end.returns("foo\n", nil)
- fakefiledata.expects(:close)
- foo_file = api_fixture('collections')['foo_file']
- @request.headers['Range'] = 'bytes=0-4294967296/*'
- get :show_file, {
- uuid: foo_file['uuid'],
- file: foo_file['manifest_text'].match(/ \d+:\d+:(\S+)/)[1]
- }, session_for(:active)
- # Wait for the whole response to arrive before deciding whether
- # mocks' expectations were met. Otherwise, Mocha will fail the
- # test depending on how slowly the ActionController::Live thread
- # runs.
- @response.body.length
- end
-
test "show file in a subdirectory of a collection" do
+ setup_for_keep_web
params = collection_params(:collection_with_files_in_subdir, 'subdir2/subdir3/subdir4/file1_in_subdir4.txt')
- expect_content = stub_file_content
get(:show_file, params, session_for(:user1_with_load))
- assert_response :success
- assert_equal(expect_content, @response.body, "failed to get a correct file from Keep")
+ assert_response :redirect
+ assert_match /subdir2\/subdir3\/subdir4\/file1_in_subdir4\.txt/, response.redirect_url
end
test 'provenance graph' do
def setup_for_keep_web cfg='https://%{uuid_or_pdh}.example', dl_cfg=false
Rails.configuration.keep_web_url = cfg
Rails.configuration.keep_web_download_url = dl_cfg
- @controller.expects(:file_enumerator).never
end
%w(uuid portable_data_hash).each do |id_type|
[true, 'Bearer configuredmanagementtoken', 200, '{"health":"OK"}'],
].each do |enabled, header, error_code, error_msg|
test "ping when #{if enabled then 'enabled' else 'disabled' end} with header '#{header}'" do
- Rails.configuration.management_token = 'configuredmanagementtoken' if enabled
+ Rails.configuration.ManagementToken = 'configuredmanagementtoken' if enabled
@request.headers['Authorization'] = header
get :ping
require 'integration_helper'
class AnonymousAccessTest < ActionDispatch::IntegrationTest
+ include KeepWebConfig
+
# These tests don't do state-changing API calls. Save some time by
# skipping the database reset.
reset_api_fixtures :after_each_test, false
end
test 'view file' do
+ use_keep_web_config
+
magic = rand(2**512).to_s 36
- CollectionsController.any_instance.stubs(:file_enumerator).returns([magic])
- collection = api_fixture('collections')['public_text_file']
- visit '/collections/' + collection['uuid']
+ owner = api_fixture('groups')['anonymously_accessible_project']['uuid']
+ col = upload_data_and_get_collection(magic, 'admin', "Hello\\040world.txt", owner)
+ visit '/collections/' + col.uuid
find('tr,li', text: 'Hello world.txt').
find('a[title~=View]').click
assert_text magic
require_relative 'integration_test_utils'
class CollectionsTest < ActionDispatch::IntegrationTest
+ include KeepWebConfig
+
setup do
need_javascript
end
test "creating and uncreating a sharing link" do
coll_uuid = api_fixture("collections", "collection_owned_by_active", "uuid")
download_link_re =
- Regexp.new(Regexp.escape("/collections/download/#{coll_uuid}/"))
+ Regexp.new(Regexp.escape("/c=#{coll_uuid}/"))
visit page_with_token("active_trustedclient", "/collections/#{coll_uuid}")
within "#sharing-button" do
check_sharing(:on, download_link_re)
end
test "can download an entire collection with a reader token" do
- Capybara.current_driver = :rack_test
- CollectionsController.any_instance.
- stubs(:file_enumerator).returns(["foo\n", "file\n"])
- uuid = api_fixture('collections')['foo_file']['uuid']
+ use_keep_web_config
+
+ token = api_fixture('api_client_authorizations')['active']['api_token']
+ data = "foo\nfile\n"
+ datablock = `echo -n #{data.shellescape} | ARVADOS_API_TOKEN=#{token.shellescape} arv-put --no-progress --raw -`.strip
+ assert $?.success?, $?
+
+ col = nil
+ use_token 'active' do
+ mtxt = ". #{datablock} 0:#{data.length}:foo\n"
+ col = Collection.create(manifest_text: mtxt)
+ end
+
+ uuid = col.uuid
token = api_fixture('api_client_authorizations')['active_all_collections']['api_token']
url_head = "/collections/download/#{uuid}/#{token}/"
visit url_head
end
assert_equal(['foo'], hrefs.compact.sort,
"download page did provide strictly file links")
- within "#collection_files" do
- click_link "foo"
- assert_equal("foo\nfile\n", page.html)
- end
+ click_link "foo"
+ assert_text "foo\nfile\n"
end
test "combine selected collections into new collection" do
assert_selector 'a[href="/"]', text: 'Go to dashboard'
end
- test "view job log" do
- job = api_fixture('jobs')['job_with_real_log']
-
- IO.expects(:popen).returns(fakepipe_with_log_data)
-
- visit page_with_token("active", "/jobs/#{job['uuid']}")
- assert page.has_text? job['script_version']
-
- find(:xpath, "//a[@href='#Log']").click
- wait_for_ajax
- assert page.has_text? 'Started at'
- assert page.has_text? 'Finished at'
- assert page.has_text? 'log message 1'
- assert page.has_text? 'log message 2'
- assert page.has_text? 'log message 3'
- assert page.has_no_text? 'Showing only 100 bytes of this log'
- end
-
test 'view partial job log' do
+ need_selenium 'to be able to see the CORS response headers (PhantomJS 1.9.8 does not)'
+ use_keep_web_config
+
# This config will be restored during teardown by ../test_helper.rb:
Rails.configuration.log_viewer_max_bytes = 100
- IO.expects(:popen).returns(fakepipe_with_log_data)
- job = api_fixture('jobs')['job_with_real_log']
-
- visit page_with_token("active", "/jobs/#{job['uuid']}")
- assert page.has_text? job['script_version']
-
- find(:xpath, "//a[@href='#Log']").click
+ logdata = fakepipe_with_log_data.read
+ job_uuid = api_fixture('jobs')['running']['uuid']
+ logcollection = upload_data_and_get_collection(logdata, 'active', "#{job_uuid}.log.txt")
+ job = nil
+ use_token 'active' do
+ job = Job.find job_uuid
+ job.update_attributes log: logcollection.portable_data_hash
+ end
+ visit page_with_token 'active', '/jobs/'+job.uuid
+ find('a[href="#Log"]').click
wait_for_ajax
- assert page.has_text? 'Showing only 100 bytes of this log'
+ assert_text 'Showing only 100 bytes of this log'
end
test 'view log via keep-web redirect' do
@kwdport = getport 'keep-web-dl-ssl'
Rails.configuration.keep_web_url = "https://localhost:#{@kwport}/c=%{uuid_or_pdh}"
Rails.configuration.keep_web_download_url = "https://localhost:#{@kwdport}/c=%{uuid_or_pdh}"
- CollectionsController.any_instance.expects(:file_enumerator).never
end
end
end
end
end
+
+def upload_data_and_get_collection(data, user, filename, owner_uuid=nil)
+ token = api_fixture('api_client_authorizations')[user]['api_token']
+ datablock = `echo -n #{data.shellescape} | ARVADOS_API_TOKEN=#{token.shellescape} arv-put --no-progress --raw -`.strip
+ assert $?.success?, $?
+ col = nil
+ use_token user do
+ mtxt = ". #{datablock} 0:#{data.length}:#{filename}\n"
+ if owner_uuid
+ col = Collection.create(manifest_text: mtxt, owner_uuid: owner_uuid)
+ else
+ col = Collection.create(manifest_text: mtxt)
+ end
+ end
+ return col
+end
debian8,debian9,ubuntu1204,centos7|six|1.10.0|2|python3|all
debian8,debian9,ubuntu1204,ubuntu1404,centos7|requests|2.12.4|2|python3|all
debian8,debian9,ubuntu1204,ubuntu1404,ubuntu1604,centos7|websocket-client|0.37.0|2|python3|all
-ubuntu1204|requests|2.12.4|2|python|all
+ubuntu1204,ubuntu1404|requests|2.4.3|2|python|all
ubuntu1204,centos7|contextlib2|0.5.4|2|python|all
ubuntu1204,centos7|isodate|0.5.4|2|python|all
centos7|daemon|2.1.1|2|python|all
centos7|keepalive|0.5|2|python|all
debian8,debian9,ubuntu1204,ubuntu1404,ubuntu1604,centos7|lockfile|0.12.2|2|python|all|--epoch 1
all|ruamel.yaml|0.13.7|2|python|amd64|--python-setup-py-arguments --single-version-externally-managed
-all|cwltest|1.0.20160907111242|3|python|all|--depends 'python-futures >= 3.0.5'
+all|cwltest|1.0.20170809112706|3|python|all|--depends 'python-futures >= 3.0.5'
+all|junit-xml|1.7|3|python|all
all|rdflib-jsonld|0.4.0|2|python|all
all|futures|3.0.5|2|python|all
all|future|0.16.0|2|python|all
# clean up the docker build environment
cd "$WORKSPACE"
+title "Starting arvbox build localdemo"
+
tools/arvbox/bin/arvbox build localdemo
ECODE=$?
EXITCODE=$(($EXITCODE + $ECODE))
fi
+title "Starting arvbox build dev"
+
tools/arvbox/bin/arvbox build dev
ECODE=$?
# Go binaries
cd $WORKSPACE/packages/$TARGET
export GOPATH=$(mktemp -d)
+go get -v github.com/kardianos/govendor
package_go_binary sdk/go/crunchrunner crunchrunner \
"Crunchrunner executes a command inside a container and uploads the output"
package_go_binary services/arv-git-httpd arvados-git-httpd \
\cp config/application.yml.example config/application.yml -f
\cp config/environments/production.rb.example config/environments/production.rb -f
sed -i 's/secret_token: ~/secret_token: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/' config/application.yml
+ sed -i 's/keep_web_url: false/keep_web_url: exampledotcom/' config/application.yml
RAILS_ENV=production RAILS_GROUPS=assets bundle exec rake assets:precompile >/dev/null
mkdir -p "$GOPATH/src/git.curoverse.com"
ln -sfn "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+ (cd "$GOPATH/src/git.curoverse.com/arvados.git" && "$GOPATH/bin/govendor" sync -v)
cd "$GOPATH/src/git.curoverse.com/arvados.git/$src_path"
local version="$(version_from_git)"
local timestamp="$(timestamp_from_git)"
- # If the command imports anything from the Arvados SDK, bump the
- # version number and build a new package whenever the SDK changes.
+ # Update the version number and build a new package if the vendor
+ # bundle has changed, or the command imports anything from the
+ # Arvados SDK and the SDK has changed.
+ declare -a checkdirs=(vendor)
if grep -qr git.curoverse.com/arvados .; then
- cd "$GOPATH/src/git.curoverse.com/arvados.git/sdk/go"
- if [[ $(timestamp_from_git) -gt "$timestamp" ]]; then
+ checkdirs+=(sdk/go)
+ fi
+ for dir in ${checkdirs[@]}; do
+ cd "$GOPATH/src/git.curoverse.com/arvados.git/$dir"
+ ts="$(timestamp_from_git)"
+ if [[ "$ts" -gt "$timestamp" ]]; then
version=$(version_from_git)
+ timestamp="$ts"
fi
- fi
+ done
cd $WORKSPACE/packages/$TARGET
test_package_presence $prog $version go
services/keep-balance
services/login-sync
services/nodemanager
-services/nodemanager-integration
+services/nodemanager_integration
services/crunch-run
services/crunch-dispatch-local
services/crunch-dispatch-slurm
apps/workbench_units | apps/workbench_functionals | apps/workbench_integration)
suite=apps/workbench
;;
+ services/nodemanager | services/nodemanager_integration)
+ suite=services/nodemanager_suite
+ ;;
*)
suite="${1}"
;;
}
do_test services/login-sync login-sync
-test_nodemanager-integration() {
+test_nodemanager_integration() {
cd "$WORKSPACE/services/nodemanager" \
- && tests/integration_test.py ${testargs[services/nodemanager-integration]}
+ && tests/integration_test.py ${testargs[services/nodemanager_integration]}
}
-do_test services/nodemanager-integration nodemanager-integration
+do_test services/nodemanager_integration nodemanager_integration
for p in "${pythonstuff[@]}"
do
func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
content, err := ioutil.ReadAll(in)
if err != nil {
- r.Logger.Print(err)
+ r.Logger.Printf("warning: %v", err)
}
return content, err
}
statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
stats, err := ioutil.ReadFile(statsFilename)
if err != nil {
- r.Logger.Print(err)
+ r.Logger.Printf("notice: %v", err)
continue
}
return strings.NewReader(string(stats)), nil
select {
case <-ticker.C:
case <-r.done:
- r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
+ r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
return false
}
}
select {
case <-ticker.C:
case <-warningTimer:
- r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
+ r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
case <-r.done:
- r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
+ r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
return false
}
}
<-done
if err != nil {
t.Fatal(err)
- } else if matched, err := regexp.MatchString("^read /proc/self/mem: .*", string(msg)); err != nil || !matched {
+ } else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
}
}
delete $Jobstep->{tempfail};
$Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
- $Jobstep->{'arvados_task'}->save;
+ retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
splice @jobstep_todo, $todo_ptr, 1;
--$todo_ptr;
"ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
exit_status_s($childstatus)));
$Jobstep->{'arvados_task'}->{success} = 0;
- $Jobstep->{'arvados_task'}->save;
+ retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
$task_success = 0;
}
$Jobstep->{exitcode} = $childstatus;
$Jobstep->{finishtime} = time;
$Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
- $Jobstep->{'arvados_task'}->save;
+ retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
process_stderr_final ($jobstepidx);
Log ($jobstepidx, sprintf("task output (%d bytes): %s",
length($Jobstep->{'arvados_task'}->{output}),
$st->{node}->{fail_count}++;
}
}
- elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
+ elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
$jobstep[$jobstepidx]->{tempfail} = 1;
if (defined($job_slot_index)) {
$slot[$job_slot_index]->{node}->{fail_count}++;
# that can be retried, the second function will be called with
# the current try count (0-based), next try time, and error message.
my $operation = shift;
- my $retry_callback = shift;
+ my $op_text = shift;
my $retries = retry_count();
+ my $retry_callback = sub {
+ my ($try_count, $next_try_at, $errmsg) = @_;
+ $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
+ $errmsg =~ s/\s/ /g;
+ $errmsg =~ s/\s+$//;
+ my $retry_msg;
+ if ($next_try_at < time) {
+ $retry_msg = "Retrying.";
+ } else {
+ my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
+ $retry_msg = "Retrying at $next_try_fmt.";
+ }
+ Log(undef, "$op_text failed: $errmsg. $retry_msg");
+ };
foreach my $try_count (0..$retries) {
my $next_try = time + (2 ** $try_count);
my $result = eval { $operation->(@_); };
# This function will call that method, retrying as needed until
# the current retry_count is exhausted, with a log on the first failure.
my $method_name = shift;
- my $log_api_retry = sub {
- my ($try_count, $next_try_at, $errmsg) = @_;
- $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
- $errmsg =~ s/\s/ /g;
- $errmsg =~ s/\s+$//;
- my $retry_msg;
- if ($next_try_at < time) {
- $retry_msg = "Retrying.";
- } else {
- my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
- $retry_msg = "Retrying at $next_try_fmt.";
- }
- Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
- };
my $method = $arv;
foreach my $key (split(/\//, $method_name)) {
$method = $method->{$key};
}
- return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
+ return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
}
sub exit_status_s {
fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
num_retries=self.num_retries,
overrides=kwargs.get("override_tools"))
+ kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
if self.arvrunner.trash_intermediate:
command.append("--trash-intermediate")
+ if self.arvrunner.project_uuid:
+ command.append("--project-uuid="+self.arvrunner.project_uuid)
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
with SourceLine(dockerRequirement, "dockerImageId", WorkflowException):
sp = dockerRequirement["dockerImageId"].split(":")
image_name = sp[0]
- image_tag = sp[1] if len(sp) > 1 else None
+ image_tag = sp[1] if len(sp) > 1 else "latest"
images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
image_name=image_name,
if project_uuid:
args.append("--project-uuid="+project_uuid)
args.append(image_name)
- if image_tag:
- args.append(image_tag)
- logger.info("Uploading Docker image %s", ":".join(args[1:]))
+ args.append(image_tag)
+ logger.info("Uploading Docker image %s:%s", image_name, image_tag)
try:
arvados.commands.keepdocker.main(args, stdout=sys.stderr)
except SystemExit as e:
self.arvrunner = arvrunner
self.work_api = kwargs["work_api"]
- def makeJobRunner(self, use_container=True):
+ def makeJobRunner(self, **kwargs):
if self.work_api == "containers":
return ArvadosContainer(self.arvrunner)
elif self.work_api == "jobs":
pipeline_template_uuid_pattern = re.compile(r'[a-z0-9]{5}-p5p6p-[a-z0-9]{15}')
def collectionResolver(api_client, document_loader, uri, num_retries=4):
+ if uri.startswith("keep:") or uri.startswith("arvwf:"):
+ return uri
+
if workflow_uuid_pattern.match(uri):
return "arvwf:%s#main" % (uri)
class StagingPathMapper(PathMapper):
_follow_dirs = True
+ def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
+ self.targets = set()
+ super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
+
def visit(self, obj, stagedir, basedir, copy=False, staged=False):
# type: (Dict[unicode, Any], unicode, unicode, bool) -> None
loc = obj["location"]
tgt = os.path.join(stagedir, obj["basename"])
+ basetgt, baseext = os.path.splitext(tgt)
+ n = 1
+ while tgt in self.targets:
+ n += 1
+ tgt = "%s_%i%s" % (basetgt, n, baseext)
+ self.targets.add(tgt)
if obj["class"] == "Directory":
self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
if loc.startswith("_:") or self._follow_dirs:
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+ else:
+ arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20170707200431',
- 'schema-salad==2.6.20170630075932',
+ 'cwltool==1.0.20170828135420',
+ 'schema-salad==2.6.20170712194300',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
'arvados-python-client>=0.1.20170526013812',
"uuid": "",
"portable_data_hash": "99999999999999999999999999999998+99",
"manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
- }}
+ },
+ "99999999999999999999999999999994+99": {
+ "uuid": "",
+ "portable_data_hash": "99999999999999999999999999999994+99",
+ "manifest_text": ". 99999999999999999999999999999994+99 0:0:expect_arvworkflow.cwl"
+ }
+ }
stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
'class': 'File',
'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
"nameext": ".txt",
- "nameroot": "blorp"
+ "nameroot": "blorp",
+ "size": 16
}},
'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
'listing': [
'class': 'File',
'location': u'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
"nameext": ".txt",
- "nameroot": "blorp"
+ "nameroot": "blorp",
+ "size": 16
},
'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
{'basename': 'renamed.txt',
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_container_project(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--project-uuid="+project_uuid,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["owner_uuid"] = project_uuid
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid,
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+
@stubs
def test_submit_job_runner_image(self, stubs):
capture_stdout = cStringIO.StringIO()
- id: '#main/x'
type: File
default: {class: File, location: 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- basename: blorp.txt, nameroot: blorp, nameext: .txt}
+ size: 16, basename: blorp.txt, nameroot: blorp, nameext: .txt}
- id: '#main/y'
type: Directory
default: {class: Directory, location: 'keep:99999999999999999999999999999998+99',
"$graph": [
{
"class": "Workflow",
+ "cwlVersion": "v1.0",
"hints": [],
"id": "#main",
"inputs": [
Path string `json:"path"`
Content interface{} `json:"content"`
ExcludeFromOutput bool `json:"exclude_from_output"`
- Capacity int64 `json:capacity`
+ Capacity int64 `json:"capacity"`
}
// RuntimeConstraints specify a container's compute resources (RAM,
}
func (e TransactionError) Error() (s string) {
- s = fmt.Sprintf("request failed: %s", e.URL)
+ s = fmt.Sprintf("request failed: %s", e.URL.String())
if e.Status != "" {
s = s + ": " + e.Status
}
Routes Routes
// If non-nil, Log is called after handling each request. The
- // error argument is nil if the request was succesfully
+ // error argument is nil if the request was successfully
// authenticated and served, even if the health check itself
// failed.
Log func(*http.Request, error)
$self->{'req'} = new HTTP::Request (%req);
$self->{'req'}->header('Authorization' => ('OAuth2 ' . $self->{'authToken'})) if $self->{'authToken'};
$self->{'req'}->header('Accept' => 'application/json');
+
+ # allow_nonref lets us encode JSON::true and JSON::false, see #12078
+ my $json = JSON->new->allow_nonref;
my ($p, $v);
while (($p, $v) = each %{$self->{'queryParams'}}) {
- $content{$p} = (ref($v) eq "") ? $v : JSON::encode_json($v);
+ $content{$p} = (ref($v) eq "") ? $v : $json->encode($v);
}
my $content;
while (($p, $v) = each %content) {
elif file_in_local_collection.permission_expired():
# Permission token expired, re-upload file. This will change whenever
# we have a API for refreshing tokens.
+ self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
should_upload = True
self._local_collection.remove(filename)
elif cached_file_data['size'] == file_in_local_collection.size():
if "job" in components[c]:
pipeline_jobs.add(components[c]["job"]["uuid"])
if known_component_jobs != pipeline_jobs:
+ new_filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
+ ws.subscribe(new_filters)
ws.unsubscribe(filters)
- filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
- ws.subscribe([['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]])
+ filters = new_filters
known_component_jobs = pipeline_jobs
api = arvados.api('v1')
sys.stdout.write(ev["properties"]["text"])
elif ev["event_type"] in ("create", "update"):
if ev["object_kind"] == "arvados#pipelineInstance":
- update_subscribed_components(ev["properties"]["new_attributes"]["components"])
+ c = api.pipeline_instances().get(uuid=ev["object_uuid"]).execute()
+ update_subscribed_components(c["components"])
if ev["object_kind"] == "arvados#pipelineInstance" and args.pipeline:
if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Paused"):
group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
+job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
+container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
def clear_tmpdir(path=None):
from builtins import str
from builtins import range
import apiclient
+import datetime
+import hashlib
+import json
import mock
import os
import pwd
+import random
import re
import shutil
import subprocess
import sys
import tempfile
+import threading
import time
import unittest
-import yaml
-import threading
-import hashlib
-import random
import uuid
+import yaml
import arvados
import arvados.commands.put as arv_put
cls.ENVIRON = os.environ.copy()
cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+
def setUp(self):
super(ArvPutIntegrationTest, self).setUp()
arv_put.api_client = None
self.assertEqual(1, len(collection_list))
return collection_list[0]
+ def test_expired_token_invalidates_cache(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+ f.write('foo')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an expired access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+ cache['manifest'] = re.sub(
+ r'\@.*? ',
+ "@{} ".format(self.datetime_to_hex(a_month_ago)),
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
+ self.assertEqual(p.returncode, 0)
+ # Confirm that the resulting cache is different from the last run.
+ with open(cache_filepath, 'r') as c2:
+ new_cache = json.load(c2)
+ self.assertNotEqual(cache['manifest'], new_cache['manifest'])
+
def test_put_collection_with_later_update(self):
tmpdir = self.make_tmpdir()
with open(os.path.join(tmpdir, 'file1'), 'w') as f:
activemodel (>= 3.0.0)
activesupport (>= 3.0.0)
rack (>= 1.1.0)
- addressable (2.5.0)
+ addressable (2.5.1)
public_suffix (~> 2.0, >= 2.0.2)
andand (1.3.3)
arel (6.0.4)
- arvados (0.1.20170215224121)
+ arvados (0.1.20170629115132)
activesupport (>= 3, < 4.2.6)
andand (~> 1.3, >= 1.3.3)
google-api-client (>= 0.7, < 0.8.9)
i18n (~> 0)
- json (~> 1.7, >= 1.7.7)
+ json (>= 1.7.7, < 3)
jwt (>= 0.1.5, < 2)
- arvados-cli (0.1.20170322173355)
+ arvados-cli (0.1.20170817171636)
activesupport (>= 3.2.13, < 5)
andand (~> 1.3, >= 1.3.3)
arvados (~> 0.1, >= 0.1.20150128223554)
curb (~> 0.8)
google-api-client (~> 0.6, >= 0.6.3, < 0.8.9)
- json (~> 1.7, >= 1.7.7)
+ json (>= 1.7.7, < 3)
oj (~> 2.0, >= 2.0.3)
trollop (~> 2.0)
autoparse (0.3.3)
hashie (3.5.5)
highline (1.7.8)
hike (1.2.3)
- i18n (0.8.1)
+ i18n (0.8.6)
jquery-rails (4.2.2)
rails-dom-testing (>= 1, < 3)
railties (>= 4.2.0)
addressable (~> 2.3)
libv8 (3.16.14.19)
little-plugger (1.1.4)
- logging (2.2.0)
+ logging (2.2.2)
little-plugger (~> 1.1)
multi_json (~> 1.10)
lograge (0.4.1)
nokogiri (>= 1.5.9)
mail (2.6.4)
mime-types (>= 1.16, < 4)
- memoist (0.15.0)
+ memoist (0.16.0)
metaclass (0.0.4)
mime-types (3.1)
mime-types-data (~> 3.2015)
mime-types-data (3.2016.0521)
mini_portile2 (2.1.0)
- minitest (5.10.1)
+ minitest (5.10.3)
mocha (1.2.1)
metaclass (~> 0.0.1)
multi_json (1.12.1)
thread_safe (0.3.6)
tilt (1.4.1)
trollop (2.1.2)
- tzinfo (1.2.2)
+ tzinfo (1.2.3)
thread_safe (~> 0.1)
uglifier (2.7.2)
execjs (>= 0.3.0)
uglifier (~> 2.0)
BUNDLED WITH
- 1.14.3
+ 1.15.1
before_filter :check_auth_header
def check_auth_header
- mgmt_token = Rails.configuration.management_token
+ mgmt_token = Rails.configuration.ManagementToken
auth_header = request.headers['Authorization']
if !mgmt_token
# Token to be included in all healthcheck requests. Disabled by default.
# Server expects request header of the format "Authorization: Bearer xxx"
- management_token: false
+ ManagementToken: false
development:
force_ssl: false
[true, 'Bearer configuredmanagementtoken', 200, '{"health":"OK"}'],
].each do |enabled, header, error_code, error_msg|
test "ping when #{if enabled then 'enabled' else 'disabled' end} with header '#{header}'" do
- Rails.configuration.management_token = 'configuredmanagementtoken' if enabled
+ Rails.configuration.ManagementToken = 'configuredmanagementtoken' if enabled
@request.headers['Authorization'] = header
get :ping
@catch_exceptions
def on_event(self, ev):
- if 'event_type' not in ev:
+ if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
return
with llfuse.lock:
- new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
- pdh = new_attrs.get("portable_data_hash")
- # new_attributes.modified_at currently lacks
- # subsecond precision (see #6347) so use event_at
- # which should always be the same.
- stamp = ev.get("event_at")
+ properties = ev.get("properties") or {}
+ old_attrs = properties.get("old_attributes") or {}
+ new_attrs = properties.get("new_attributes") or {}
for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
item.invalidate()
- if stamp and pdh and ev.get("object_kind") == "arvados#collection":
- item.update(to_record_version=(stamp, pdh))
- else:
- item.update()
-
- oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
+ if ev.get("object_kind") == "arvados#collection":
+ pdh = new_attrs.get("portable_data_hash")
+ # new_attributes.modified_at currently lacks
+ # subsecond precision (see #6347) so use event_at
+ # which should always be the same.
+ stamp = ev.get("event_at")
+ if (stamp and pdh and item.writable() and
+ item.collection is not None and
+ item.collection.modified() and
+ new_attrs.get("is_trashed") is not True):
+ item.update(to_record_version=(stamp, pdh))
+
+ oldowner = old_attrs.get("owner_uuid")
newowner = ev.get("object_owner_uuid")
for parent in (
self.inodes.inode_cache.find_by_uuid(oldowner) +
self.inodes.inode_cache.find_by_uuid(newowner)):
- parent.invalidate()
- parent.update()
+ parent.child_event(ev)
@catch_exceptions
def getattr(self, inode, ctx=None):
self.logger.info("enable write is %s", self.args.enable_write)
def _setup_api(self):
- self.api = arvados.safeapi.ThreadSafeApiCache(
- apiconfig=arvados.config.settings(),
- keep_params={
- 'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
- 'num_retries': self.args.retries,
- })
+ try:
+ self.api = arvados.safeapi.ThreadSafeApiCache(
+ apiconfig=arvados.config.settings(),
+ keep_params={
+ 'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+ 'num_retries': self.args.retries,
+ })
+ except KeyError as e:
+ self.logger.error("Missing environment: %s", e)
+ exit(1)
# Do a sanity check that we have a working arvados host + token.
self.api.users().current().execute()
def finalize(self):
pass
+
+ def child_event(self, ev):
+ pass
self._poll_time = poll_time
self._updating_lock = threading.Lock()
self._current_user = None
+ self._full_listing = False
def want_event_subscribe(self):
return True
def uuid(self):
return self.project_uuid
+ def items(self):
+ self._full_listing = True
+ return super(ProjectDirectory, self).items()
+
+ def namefn(self, i):
+ if 'name' in i:
+ if i['name'] is None or len(i['name']) == 0:
+ return None
+ elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
+ # collection or subproject
+ return i['name']
+ elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
+ # name link
+ return i['name']
+ elif 'kind' in i and i['kind'].startswith('arvados#'):
+ # something else
+ return "{}.{}".format(i['name'], i['kind'][8:])
+ else:
+ return None
+
+
@use_counter
def update(self):
if self.project_object_file == None:
self.project_object_file = ObjectFile(self.inode, self.project_object)
self.inodes.add_entry(self.project_object_file)
- def namefn(i):
- if 'name' in i:
- if i['name'] is None or len(i['name']) == 0:
- return None
- elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
- # collection or subproject
- return i['name']
- elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
- # name link
- return i['name']
- elif 'kind' in i and i['kind'].startswith('arvados#'):
- # something else
- return "{}.{}".format(i['name'], i['kind'][8:])
- else:
- return None
+ if not self._full_listing:
+ return
def samefn(a, i):
if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
self.project_object = self.api.users().get(
uuid=self.project_uuid).execute(num_retries=self.num_retries)
- contents = arvados.util.list_all(self.api.groups().contents,
- self.num_retries, uuid=self.project_uuid)
+ contents = arvados.util.list_all(self.api.groups().list,
+ self.num_retries,
+ filters=[["owner_uuid", "=", self.project_uuid],
+ ["group_class", "=", "project"]])
+ contents.extend(arvados.util.list_all(self.api.collections().list,
+ self.num_retries,
+ filters=[["owner_uuid", "=", self.project_uuid]]))
# end with llfuse.lock_released, re-acquire lock
self.merge(contents,
- namefn,
+ self.namefn,
samefn,
self.createDirectory)
finally:
self._updating_lock.release()
+ def _add_entry(self, i, name):
+ ent = self.createDirectory(i)
+ self._entries[name] = self.inodes.add_entry(ent)
+ return self._entries[name]
+
@use_counter
@check_update
- def __getitem__(self, item):
- if item == '.arvados#project':
+ def __getitem__(self, k):
+ if k == '.arvados#project':
return self.project_object_file
- else:
- return super(ProjectDirectory, self).__getitem__(item)
+ elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
+ return super(ProjectDirectory, self).__getitem__(k)
+ with llfuse.lock_released:
+ contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+ ["group_class", "=", "project"],
+ ["name", "=", k]],
+ limit=1).execute(num_retries=self.num_retries)["items"]
+ if not contents:
+ contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+ ["name", "=", k]],
+ limit=1).execute(num_retries=self.num_retries)["items"]
+ if contents:
+ name = sanitize_filename(self.namefn(contents[0]))
+ if name != k:
+ raise KeyError(k)
+ return self._add_entry(contents[0], name)
+
+ # Didn't find item
+ raise KeyError(k)
def __contains__(self, k):
if k == '.arvados#project':
return True
- else:
- return super(ProjectDirectory, self).__contains__(k)
+ try:
+ self[k]
+ return True
+ except KeyError:
+ pass
+ return False
@use_counter
@check_update
self._entries[name_new] = ent
self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
+ @use_counter
+ def child_event(self, ev):
+ properties = ev.get("properties") or {}
+ old_attrs = properties.get("old_attributes") or {}
+ new_attrs = properties.get("new_attributes") or {}
+ old_attrs["uuid"] = ev["object_uuid"]
+ new_attrs["uuid"] = ev["object_uuid"]
+ old_name = sanitize_filename(self.namefn(old_attrs))
+ new_name = sanitize_filename(self.namefn(new_attrs))
+
+ # create events will have a new name, but not an old name
+ # delete events will have an old name, but not a new name
+ # update events will have an old and new name, and they may be same or different
+ # if they are the same, an unrelated field changed and there is nothing to do.
+
+ if old_attrs.get("owner_uuid") != self.project_uuid:
+ # Was moved from somewhere else, so don't try to remove entry.
+ old_name = None
+ if ev.get("object_owner_uuid") != self.project_uuid:
+ # Was moved to somewhere else, so don't try to add entry
+ new_name = None
+
+ if ev.get("object_kind") == "arvados#collection":
+ if old_attrs.get("is_trashed"):
+ # Was previously deleted
+ old_name = None
+ if new_attrs.get("is_trashed"):
+ # Has been deleted
+ new_name = None
+
+ if new_name != old_name:
+ ent = None
+ if old_name in self._entries:
+ ent = self._entries[old_name]
+ del self._entries[old_name]
+ self.inodes.invalidate_entry(self.inode, old_name.encode(self.inodes.encoding))
+
+ if new_name:
+ if ent is not None:
+ self._entries[new_name] = ent
+ else:
+ self._add_entry(new_attrs, new_name)
+ elif ent is not None:
+ self.inodes.del_entry(ent)
+
class SharedDirectory(Directory):
"""A special directory that represents users or groups who have shared projects with me."""
attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
+def fuseSharedTestHelper(mounttmp):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ # Double check that we can open and read objects in this folder as a file,
+ # and that its contents are what we expect.
+ baz_path = os.path.join(
+ mounttmp,
+ 'FUSE User',
+ 'FUSE Test Project',
+ 'collection in FUSE project',
+ 'baz')
+ with open(baz_path) as f:
+ self.assertEqual("baz", f.read())
+
+ # check mtime on collection
+ st = os.stat(baz_path)
+ try:
+ mtime = st.st_mtime_ns / 1000000000
+ except AttributeError:
+ mtime = st.st_mtime
+ self.assertEqual(mtime, 1391448174)
+
+ # shared_dirs is a list of the directories exposed
+ # by fuse.SharedDirectory (i.e. any object visible
+ # to the current user)
+ shared_dirs = llfuse.listdir(mounttmp)
+ shared_dirs.sort()
+ self.assertIn('FUSE User', shared_dirs)
+
+ # fuse_user_objs is a list of the objects owned by the FUSE
+ # test user (which present as files in the 'FUSE User'
+ # directory)
+ fuse_user_objs = llfuse.listdir(os.path.join(mounttmp, 'FUSE User'))
+ fuse_user_objs.sort()
+ self.assertEqual(['FUSE Test Project', # project owned by user
+ 'collection #1 owned by FUSE', # collection owned by user
+ 'collection #2 owned by FUSE' # collection owned by user
+ ], fuse_user_objs)
+
+ # test_proj_files is a list of the files in the FUSE Test Project.
+ test_proj_files = llfuse.listdir(os.path.join(mounttmp, 'FUSE User', 'FUSE Test Project'))
+ test_proj_files.sort()
+ self.assertEqual(['collection in FUSE project'
+ ], test_proj_files)
+
+
+ Test().runTest()
+
class FuseSharedTest(MountTestBase):
def runTest(self):
self.make_mount(fuse.SharedDirectory,
exclude=self.api.users().current().execute()['uuid'])
+ keep = arvados.keep.KeepClient()
+ keep.put("baz")
- # shared_dirs is a list of the directories exposed
- # by fuse.SharedDirectory (i.e. any object visible
- # to the current user)
- shared_dirs = llfuse.listdir(self.mounttmp)
- shared_dirs.sort()
- self.assertIn('FUSE User', shared_dirs)
-
- # fuse_user_objs is a list of the objects owned by the FUSE
- # test user (which present as files in the 'FUSE User'
- # directory)
- fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
- fuse_user_objs.sort()
- self.assertEqual(['FUSE Test Project', # project owned by user
- 'collection #1 owned by FUSE', # collection owned by user
- 'collection #2 owned by FUSE', # collection owned by user
- 'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user
- ], fuse_user_objs)
-
- # test_proj_files is a list of the files in the FUSE Test Project.
- test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
- test_proj_files.sort()
- self.assertEqual(['collection in FUSE project',
- 'pipeline instance in FUSE project.pipelineInstance',
- 'pipeline template in FUSE project.pipelineTemplate'
- ], test_proj_files)
-
- # Double check that we can open and read objects in this folder as a file,
- # and that its contents are what we expect.
- pipeline_template_path = os.path.join(
- self.mounttmp,
- 'FUSE User',
- 'FUSE Test Project',
- 'pipeline template in FUSE project.pipelineTemplate')
- with open(pipeline_template_path) as f:
- j = json.load(f)
- self.assertEqual("pipeline template in FUSE project", j['name'])
-
- # check mtime on template
- st = os.stat(pipeline_template_path)
- try:
- mtime = st.st_mtime_ns / 1000000000
- except AttributeError:
- mtime = st.st_mtime
- self.assertEqual(mtime, 1397493304)
-
- # check mtime on collection
- st = os.stat(os.path.join(
- self.mounttmp,
- 'FUSE User',
- 'collection #1 owned by FUSE'))
- try:
- mtime = st.st_mtime_ns / 1000000000
- except AttributeError:
- mtime = st.st_mtime
- self.assertEqual(mtime, 1391448174)
+ self.pool.apply(fuseSharedTestHelper, (self.mounttmp,))
class FuseHomeTest(MountTestBase):
// SSL certificates. See
// http://www.w3.org/TR/cors/#user-credentials).
w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
}
arv := h.clientPool.Get()
&s3UnsafeDelete,
"s3-unsafe-delete",
false,
- "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
+ "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
}
// S3Volume implements Volume using an S3 bucket.
def __init__(self, *args, **kwargs):
super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
self.actor_ref = TellableActorRef(self)
+ self._killfunc = kwargs.get("killfunc", os.kill)
def on_failure(self, exception_type, exception_value, tb):
lg = getattr(self, "_logger", logging)
if (exception_type in (threading.ThreadError, MemoryError) or
exception_type is OSError and exception_value.errno == errno.ENOMEM):
lg.critical("Unhandled exception is a fatal error, killing Node Manager")
- os.kill(os.getpid(), signal.SIGKILL)
+ self._killfunc(os.getpid(), signal.SIGKILL)
def ping(self):
return True
+ def get_thread(self):
+ return threading.current_thread()
class WatchdogActor(pykka.ThreadingActor):
def __init__(self, timeout, *args, **kwargs):
self.actors = [a.proxy() for a in args]
self.actor_ref = TellableActorRef(self)
self._later = self.actor_ref.tell_proxy()
+ self._killfunc = kwargs.get("killfunc", os.kill)
def kill_self(self, e, act):
lg = getattr(self, "_logger", logging)
lg.critical("Watchdog exception", exc_info=e)
lg.critical("Actor %s watchdog ping time out, killing Node Manager", act)
- os.kill(os.getpid(), signal.SIGKILL)
+ self._killfunc(os.getpid(), signal.SIGKILL)
def on_start(self):
self._later.run()
return super(ComputeNodeShutdownActor, self)._finished()
def cancel_shutdown(self, reason, **kwargs):
+ if self.cancel_reason is not None:
+ # already cancelled
+ return
self.cancel_reason = reason
self._logger.info("Shutdown cancelled: %s.", reason)
self._finished(success_flag=False)
@_cancel_on_exception
def shutdown_node(self):
+ if self.cancel_reason is not None:
+ # already cancelled
+ return
if self.cancellable:
self._logger.info("Checking that node is still eligible for shutdown")
eligible, reason = self._monitor.shutdown_eligible().get()
if output in ("drng\n", "alloc\n", "drng*\n", "alloc*\n"):
self._timer.schedule(time.time() + 10,
self._later.await_slurm_drain)
- elif output in ("idle\n"):
+ elif output in ("idle\n",):
# Not in "drng" but idle, don't shut down
self.cancel_shutdown("slurm state is %s" % output.strip(), try_resume=False)
else:
'watchdog': '600',
'node_mem_scaling': '0.95'},
'Manage': {'address': '127.0.0.1',
- 'port': '-1'},
+ 'port': '-1',
+ 'ManagementToken': ''},
'Logging': {'file': '/dev/stderr',
'level': 'WARNING'}
}.iteritems():
import logging
import subprocess
+import arvados.util
+
from . import clientactor
from .config import ARVADOS_ERRORS
+
class ServerCalculator(object):
"""Generate cloud server wishlists from an Arvados job queue.
self.max_nodes = max_nodes or float('inf')
self.max_price = max_price or float('inf')
self.logger = logging.getLogger('arvnodeman.jobqueue')
- self.logged_jobs = set()
self.logger.info("Using cloud node sizes:")
for s in self.cloud_sizes:
def servers_for_queue(self, queue):
servers = []
- seen_jobs = set()
+ unsatisfiable_jobs = {}
for job in queue:
- seen_jobs.add(job['uuid'])
constraints = job['runtime_constraints']
want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
cloud_size = self.cloud_size_for_constraints(constraints)
if cloud_size is None:
- if job['uuid'] not in self.logged_jobs:
- self.logged_jobs.add(job['uuid'])
- self.logger.debug("job %s not satisfiable", job['uuid'])
- elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
+ unsatisfiable_jobs[job['uuid']] = (
+ 'Requirements for a single node exceed the available '
+ 'cloud node size')
+ elif (want_count > self.max_nodes):
+ unsatisfiable_jobs[job['uuid']] = (
+ "Job's min_nodes constraint is greater than the configured "
+ "max_nodes (%d)" % self.max_nodes)
+ elif (want_count*cloud_size.price <= self.max_price):
servers.extend([cloud_size.real] * want_count)
- self.logged_jobs.intersection_update(seen_jobs)
- return servers
+ else:
+ unsatisfiable_jobs[job['uuid']] = (
+ "Job's price (%d) is above system's max_price "
+ "limit (%d)" % (want_count*cloud_size.price, self.max_price))
+ return (servers, unsatisfiable_jobs)
def cheapest_size(self):
return self.cloud_sizes[0]
return s
return None
+
class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
"""Actor to generate server wishlists from the job queue.
for out in squeue_out.splitlines():
try:
cpu, ram, disk, reason, jobname = out.split("|", 4)
- if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
+ if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
queuelist.append({
"uuid": jobname,
"runtime_constraints": {
return queuelist
def _got_response(self, queue):
- server_list = self._calculator.servers_for_queue(queue)
+ server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
+ # Cancel any job/container with unsatisfiable requirements, emitting
+ # a log explaining why.
+ for job_uuid, reason in unsatisfiable_jobs.iteritems():
+ try:
+ self._client.logs().create(body={
+ 'object_uuid': job_uuid,
+ 'event_type': 'stderr',
+ 'properties': {'text': reason},
+ }).execute()
+ # Cancel the job depending on its type
+ if arvados.util.container_uuid_pattern.match(job_uuid):
+ subprocess.check_call(['scancel', '--name='+job_uuid])
+ elif arvados.util.job_uuid_pattern.match(job_uuid):
+ self._client.jobs().cancel(uuid=job_uuid).execute()
+ else:
+ raise Exception('Unknown job type')
+ self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid)
+ except Exception as error:
+ self._logger.error("Trying to cancel job '%s': %s",
+ job_uuid,
+ error)
self._logger.debug("Calculated wishlist: %s",
', '.join(s.name for s in server_list) or "(empty)")
return super(JobQueueMonitorActor, self)._got_response(server_list)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(tracker.get_json())
+ elif self.path == '/_health/ping':
+ code, msg = self.check_auth()
+
+ if code != 200:
+ self.send_response(code)
+ self.wfile.write(msg)
+ else:
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.end_headers()
+ self.wfile.write(json.dumps({"health":"OK"}))
else:
self.send_response(404)
def log_message(self, fmt, *args, **kwargs):
_logger.info(fmt, *args, **kwargs)
+ def check_auth(self):
+ mgmt_token = self.server._config.get('Manage', 'ManagementToken')
+ auth_header = self.headers.get('Authorization', None)
+
+ if mgmt_token == '':
+ return 404, "disabled"
+ elif auth_header == None:
+ return 401, "authorization required"
+ elif auth_header != 'Bearer '+mgmt_token:
+ return 403, "authorization error"
+ return 200, ""
class Tracker(object):
def __init__(self):
message at a later time. This actor runs the necessary event loop for
delivery.
"""
- def __init__(self, max_sleep=1):
+ def __init__(self, max_sleep=1, timefunc=None):
super(TimedCallBackActor, self).__init__()
self._proxy = self.actor_ref.tell_proxy()
self.messages = []
self.max_sleep = max_sleep
+ if timefunc is None:
+ self._timefunc = time.time
+ else:
+ self._timefunc = timefunc
def schedule(self, delivery_time, receiver, *args, **kwargs):
if not self.messages:
def deliver(self):
if not self.messages:
return
- til_next = self.messages[0][0] - time.time()
+ til_next = self.messages[0][0] - self._timefunc()
if til_next <= 0:
t, receiver, args, kwargs = heapq.heappop(self.messages)
try:
],
install_requires=[
'apache-libcloud>=0.20',
- 'arvados-python-client>=0.1.20150206225333',
+ 'arvados-python-client>=0.1.20170731145219',
'future',
'pykka',
'python-daemon',
fake_slurm = None
compute_nodes = None
all_jobs = None
+unsatisfiable_job_scancelled = None
def update_script(path, val):
with open(path+"_", "w") as f:
"\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
return 0
+def set_queue_unsatisfiable(g):
+ global all_jobs, unsatisfiable_job_scancelled
+ # Simulate a job requesting a 99 core node.
+ update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
+ "\n".join("echo '99|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+ update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
+ "\ntouch %s" % unsatisfiable_job_scancelled)
+ return 0
+
+def job_cancelled(g):
+ global unsatisfiable_job_scancelled
+ cancelled_job = g.group(1)
+ api = arvados.api('v1')
+ # Check that 'scancel' was called
+ if not os.path.isfile(unsatisfiable_job_scancelled):
+ return 1
+ # Check for the log entry
+ log_entry = api.logs().list(
+ filters=[
+ ['object_uuid', '=', cancelled_job],
+ ['event_type', '=', 'stderr'],
+ ]).execute()['items'][0]
+ if not re.match(
+ r"Requirements for a single node exceed the available cloud node size",
+ log_entry['properties']['text']):
+ return 1
+ return 0
def node_paired(g):
global compute_nodes
def run_test(name, actions, checks, driver_class, jobs, provider):
code = 0
+ global unsatisfiable_job_scancelled
+ unsatisfiable_job_scancelled = os.path.join(tempfile.mkdtemp(),
+ "scancel_called")
# Delete any stale node records
api = arvados.api('v1')
# Test main loop:
# - Read line
- # - Apply negative checks (thinks that are not supposed to happen)
+ # - Apply negative checks (things that are not supposed to happen)
# - Check timeout
# - Check if the next action should trigger
# - If all actions are exhausted, terminate with test success
code = 1
shutil.rmtree(fake_slurm)
+ shutil.rmtree(os.path.dirname(unsatisfiable_job_scancelled))
if code == 0:
logger.info("%s passed", name)
# Test lifecycle.
tests = {
+ "test_unsatisfiable_jobs" : (
+ # Actions (pattern -> action)
+ [
+ (r".*Daemon started", set_queue_unsatisfiable),
+ (r".*Cancelled unsatisfiable job '(\S+)'", job_cancelled),
+ ],
+ # Checks (things that shouldn't happen)
+ {
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": fail,
+ r".*Trying to cancel job '(\S+)'": fail,
+ },
+ # Driver class
+ "arvnodeman.test.fake_driver.FakeDriver",
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_single_node_azure": (
[
(r".*Daemon started", set_squeue),
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+#
+#
+# Usage: arvados-cwl-runner stress_test.cwl
+#
+# Submits 100 jobs or containers, creating load on node manager and
+# scheduler.
+
+class: Workflow
+cwlVersion: v1.0
+requirements:
+ ScatterFeatureRequirement: {}
+ InlineJavascriptRequirement: {}
+inputs: []
+outputs: []
+steps:
+ step1:
+ in: []
+ out: [out]
+ run:
+ class: ExpressionTool
+ inputs: []
+ outputs:
+ out: int[]
+ expression: |
+ ${
+ var r = [];
+ for (var i = 1; i <= 100; i++) {
+ r.push(i);
+ }
+ return {out: r};
+ }
+ step2:
+ in:
+ num: step1/out
+ out: []
+ scatter: num
+ run:
+ class: CommandLineTool
+ requirements:
+ ShellCommandRequirement: {}
+ inputs:
+ num: int
+ outputs: []
+ arguments: [echo, "starting",
+ {shellQuote: false, valueFrom: "&&"},
+ sleep, $((101-inputs.num)*2),
+ {shellQuote: false, valueFrom: "&&"},
+ echo, "the number of the day is", $(inputs.num)]
]
self.make_actor()
self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.setup_actor.ping().get(self.TIMEOUT)
self.assertEqual(1, self.cloud_client.post_create_node.call_count)
def test_instance_exceeded_not_retried(self):
self.api_client.nodes().create().execute.side_effect = retry_resp
self.api_client.nodes().update().execute.side_effect = retry_resp
self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.setup_actor.ping().get(self.TIMEOUT)
self.assertEqual(self.setup_actor.actor_ref.actor_urn,
subscriber.call_args[0][0].actor_ref.actor_urn)
self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
self.cloud_client.destroy_node.return_value = True
self.make_actor(cancellable=True)
- self.check_success_flag(False)
+ self.check_success_flag(False, 2)
self.assertFalse(self.cloud_client.destroy_node.called)
def test_uncancellable_shutdown(self, *mocks):
self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
self.cloud_client.destroy_node.return_value = True
self.make_actor(cancellable=False)
- self.check_success_flag(True, 2)
+ self.check_success_flag(True, 4)
self.assertTrue(self.cloud_client.destroy_node.called)
def test_arvados_node_cleaned_after_shutdown(self, *mocks):
+ if len(mocks) == 1:
+ mocks[0].return_value = "drain\n"
cloud_node = testutil.cloud_node_mock(62)
arv_node = testutil.arvados_node_mock(62)
self.make_mocks(cloud_node, arv_node)
self.assertTrue(update_mock().execute.called)
def test_arvados_node_not_cleaned_after_shutdown_cancelled(self, *mocks):
+ if len(mocks) == 1:
+ mocks[0].return_value = "idle\n"
cloud_node = testutil.cloud_node_mock(61)
arv_node = testutil.arvados_node_mock(61)
self.make_mocks(cloud_node, arv_node, shutdown_open=False)
self.cloud_client.destroy_node.return_value = False
self.make_actor(cancellable=True)
self.shutdown_actor.cancel_shutdown("test")
+ self.shutdown_actor.ping().get(self.TIMEOUT)
self.check_success_flag(False, 2)
self.assertFalse(self.arvados_client.nodes().update.called)
def test_in_state_when_no_state_available(self):
self.make_actor(arv_node=testutil.arvados_node_mock(
crunch_worker_state=None))
- print(self.node_actor.get_state().get())
self.assertTrue(self.node_state('idle'))
def test_in_state_when_no_state_available_old(self):
self.make_actor(arv_node=testutil.arvados_node_mock(
crunch_worker_state=None, age=90000))
- print(self.node_actor.get_state().get())
self.assertTrue(self.node_state('down'))
def test_in_idle_state(self):
self.timer = testutil.MockTimer(False)
self.make_actor()
self.check_success_flag(None, 0)
+ # At this point, 1st try should have happened.
+
self.timer.deliver()
self.check_success_flag(None, 0)
- self.timer.deliver()
+ # At this point, 2nd try should have happened.
+
# Order is critical here: if the mock gets called when no return value
# or side effect is set, we may invoke a real subprocess.
proc_mock.return_value = end_state
proc_mock.side_effect = None
+
+ # 3rd try
+ self.timer.deliver()
+
self.check_success_flag(True, 3)
self.check_slurm_got_args(proc_mock, 'NodeName=compute63')
self.check_success_flag(True)
self.assertFalse(proc_mock.called)
- def test_node_undrained_when_shutdown_cancelled(self, proc_mock):
+ def test_node_resumed_when_shutdown_cancelled(self, proc_mock):
try:
proc_mock.side_effect = iter(['', 'drng\n', 'drng\n', ''])
self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
self.timer = testutil.MockTimer(False)
self.make_actor()
self.busywait(lambda: proc_mock.call_args is not None)
- self.shutdown_actor.cancel_shutdown("test").get(self.TIMEOUT)
+ self.shutdown_actor.cancel_shutdown("test")
self.check_success_flag(False, 2)
- self.assertEqual(proc_mock.call_args_list,
- [mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']),
- mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
- mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
- mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME'])])
+ self.assertEqual(proc_mock.call_args_list[0], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']))
+ self.assertEqual(proc_mock.call_args_list[-1], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME']))
+
finally:
self.shutdown_actor.actor_ref.stop()
proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n', 'idle\n'])
self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
self.make_actor()
- self.check_success_flag(False, 2)
+ self.check_success_flag(False, 5)
def test_issue_slurm_drain_retry(self, proc_mock):
- proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
+ proc_mock.side_effect = iter([OSError, OSError, 'drng\n', 'drain\n'])
self.check_success_after_reset(proc_mock, timer=False)
def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
+
+ def busywait(self, f):
+ n = 0
+ while not f() and n < 200:
+ time.sleep(.1)
+ self.daemon.ping().get(self.TIMEOUT)
+ n += 1
+ self.assertTrue(f())
+
def mock_node_start(self, **kwargs):
# Make sure that every time the daemon starts a setup actor,
# it gets a new mock object back.
self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
def monitor_list(self):
- return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
+ return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
- def monitored_arvados_nodes(self):
+ def monitored_arvados_nodes(self, include_unpaired=True):
pairings = []
for future in [actor.proxy().arvados_node
for actor in self.monitor_list()]:
try:
- pairings.append(future.get(self.TIMEOUT))
+ g = future.get(self.TIMEOUT)
+ if g or include_unpaired:
+ pairings.append(g)
except pykka.ActorDeadError:
pass
return pairings
def alive_monitor_count(self):
return len(self.monitored_arvados_nodes())
+ def paired_monitor_count(self):
+ return len(self.monitored_arvados_nodes(False))
+
def assertShutdownCancellable(self, expected=True):
self.assertTrue(self.node_shutdown.start.called)
self.assertIs(expected,
def test_easy_node_creation(self):
size = testutil.MockSize(1)
self.make_daemon(want_sizes=[size])
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
+ self.busywait(lambda: self.node_setup.start.called)
def check_monitors_arvados_nodes(self, *arv_nodes):
+ self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
def test_node_pairing(self):
cloud_node = testutil.cloud_node_mock(1)
arv_node = testutil.arvados_node_mock(1)
self.make_daemon([cloud_node], [arv_node])
- self.stop_proxy(self.daemon)
self.check_monitors_arvados_nodes(arv_node)
def test_node_pairing_after_arvados_update(self):
[testutil.arvados_node_mock(1, ip_address=None)])
arv_node = testutil.arvados_node_mock(2)
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
self.check_monitors_arvados_nodes(arv_node)
def test_arvados_node_un_and_re_paired(self):
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
self.check_monitors_arvados_nodes(arv_node)
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
- self.assertEqual(0, self.alive_monitor_count())
+ self.busywait(lambda: 0 == self.alive_monitor_count())
self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
- self.stop_proxy(self.daemon)
self.check_monitors_arvados_nodes(arv_node)
def test_old_arvados_node_not_double_assigned(self):
def test_node_count_satisfied(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
want_sizes=[testutil.MockSize(1)])
- self.stop_proxy(self.daemon)
- self.assertFalse(self.node_setup.start.called)
+ self.busywait(lambda: not self.node_setup.start.called)
def test_dont_count_missing_as_busy(self):
size = testutil.MockSize(1)
2,
last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size, size])
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
+ self.busywait(lambda: 2 == self.alive_monitor_count())
+ self.busywait(lambda: self.node_setup.start.called)
def test_missing_counts_towards_max(self):
size = testutil.MockSize(1)
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size, size],
max_nodes=2)
- self.stop_proxy(self.daemon)
- self.assertFalse(self.node_setup.start.called)
+ self.busywait(lambda: not self.node_setup.start.called)
def test_excess_counts_missing(self):
size = testutil.MockSize(1)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.paired_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.assertEqual(1, self.node_shutdown.start.call_count)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
-
+ self.busywait(lambda: 2 == self.paired_monitor_count())
get_cloud_node = mock.MagicMock(name="get_cloud_node")
get_cloud_node.get.return_value = cloud_nodes[1]
mock_node_monitor = mock.MagicMock()
self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.alive_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
- self.assertEqual(1, self.node_shutdown.start.call_count)
+ self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
def test_booting_nodes_counted(self):
cloud_node = testutil.cloud_node_mock(1)
self.daemon.max_nodes.get(self.TIMEOUT)
self.assertTrue(self.node_setup.start.called)
self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertEqual(1, self.node_setup.start.call_count)
+ self.busywait(lambda: 1 == self.node_setup.start.call_count)
def test_boot_new_node_when_all_nodes_busy(self):
size = testutil.MockSize(2)
arv_node = testutil.arvados_node_mock(2, job_uuid=True)
self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
[size], avail_sizes=[(size, {"cores":1})])
+ self.busywait(lambda: 1 == self.paired_monitor_count())
self.busywait(lambda: self.node_setup.start.called)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
def test_boot_new_node_below_min_nodes(self):
min_size = testutil.MockSize(1)
now = time.time()
self.monitor_list()[0].tell_proxy().consider_shutdown()
self.busywait(lambda: self.node_shutdown.start.called)
- self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_booted_node_shut_down_when_never_paired(self):
self.daemon.update_cloud_nodes([cloud_node])
self.monitor_list()[0].tell_proxy().consider_shutdown()
self.busywait(lambda: self.node_shutdown.start.called)
- self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_booted_node_shut_down_when_never_working(self):
self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
self.daemon.update_cloud_nodes([cloud_node])
self.busywait(lambda: self.node_shutdown.start.called)
- self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_node_that_pairs_not_considered_failed_boot(self):
def test_booting_nodes_shut_down(self):
self.make_daemon(want_sizes=[testutil.MockSize(1)])
self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
+ self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
def test_all_booting_nodes_tried_to_shut_down(self):
size = testutil.MockSize(2)
arv_node = testutil.arvados_node_mock(1)
size = testutil.MockSize(1)
self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
cloud_node = testutil.cloud_node_mock(1)
arv_node = testutil.arvados_node_mock(1)
self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
def test_shutdown_accepted_below_capacity(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_shutdown.start.called)
+ self.busywait(lambda: self.node_shutdown.start.called)
def test_shutdown_declined_when_idle_and_job_queued(self):
size = testutil.MockSize(1)
arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
testutil.arvados_node_mock(4, job_uuid=None)]
self.make_daemon(cloud_nodes, arv_nodes, [size])
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.paired_monitor_count())
for mon_ref in self.monitor_list():
monitor = mon_ref.proxy()
if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.last_shutdown.success.get.return_value = False
self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.last_shutdown.success.get.return_value = True
self.last_shutdown.stop.side_effect = lambda: monitor.stop()
self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
- self.assertEqual(0, self.alive_monitor_count())
+ self.busywait(lambda: 0 == self.paired_monitor_count())
def test_nodes_shutting_down_replaced_below_max_nodes(self):
size = testutil.MockSize(6)
self.assertTrue(self.node_shutdown.start.called)
self.daemon.update_server_wishlist(
[testutil.MockSize(6)]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
+ self.busywait(lambda: self.node_setup.start.called)
def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
cloud_node = testutil.cloud_node_mock(7)
self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
max_nodes=1)
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.assertTrue(self.node_shutdown.start.called)
self.daemon.update_server_wishlist(
[testutil.MockSize(7)]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertFalse(self.node_setup.start.called)
+ self.busywait(lambda: not self.node_setup.start.called)
def test_nodes_shutting_down_count_against_excess(self):
size = testutil.MockSize(8)
arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
self.make_daemon(cloud_nodes, arv_nodes, [size],
avail_sizes=[(size, {"cores":1})])
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.paired_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.assertEqual(1, self.node_shutdown.start.call_count)
size = testutil.MockSize(2)
self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
self.timer.deliver()
- self.stop_proxy(self.daemon)
- self.assertEqual(1, self.node_setup.start.call_count)
+ self.busywait(lambda: 1 == self.node_setup.start.call_count)
def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertEqual(
- 1, self.last_shutdown.stop.call_count)
+ self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
# the ActorDeadError.
self.last_shutdown.stop.side_effect = pykka.ActorDeadError
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertEqual(1, self.last_shutdown.stop.call_count)
+ self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
def test_node_create_two_sizes(self):
small = testutil.MockSize(1)
testutil.arvados_node_mock(3)],
want_sizes=[small, small, big],
avail_sizes=avail_sizes)
-
+ self.busywait(lambda: 3 == self.paired_monitor_count())
self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
self.assertEqual(0, self.node_shutdown.start.call_count)
booting = self.daemon.booting.get()
cloud_nodes = self.daemon.cloud_nodes.get()
- self.stop_proxy(self.daemon)
+ self.busywait(lambda: 1 == self.node_setup.start.call_count)
+ self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
- self.assertEqual(1, self.node_setup.start.call_count)
- self.assertEqual(1, self.node_shutdown.start.call_count)
+ self.stop_proxy(self.daemon)
# booting a new big node
sizecounts = {a[0].id: 0 for a in avail_sizes}
import arvnodeman.baseactor
class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
- def __init__(self, e):
- super(BogusActor, self).__init__()
+ def __init__(self, e, killfunc=None):
+ super(BogusActor, self).__init__(killfunc=killfunc)
self.exp = e
def doStuff(self):
def ping(self):
# Called by WatchdogActorTest, this delay is longer than the test timeout
# of 1 second, which should cause the watchdog ping to fail.
- time.sleep(4)
+ time.sleep(2)
return True
class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
def test_fatal_error(self):
for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
- with mock.patch('os.kill') as kill_mock:
- act = BogusActor.start(e).tell_proxy()
- act.doStuff()
- act.actor_ref.stop(block=True)
- self.assertTrue(kill_mock.called)
-
- @mock.patch('os.kill')
- def test_nonfatal_error(self, kill_mock):
- act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+ kill_mock = mock.Mock('os.kill')
+ bgact = BogusActor.start(e, killfunc=kill_mock)
+ act_thread = bgact.proxy().get_thread().get()
+ act = bgact.tell_proxy()
+ act.doStuff()
+ act.actor_ref.stop(block=True)
+ act_thread.join()
+ self.assertTrue(kill_mock.called)
+
+ def test_nonfatal_error(self):
+ kill_mock = mock.Mock('os.kill')
+ act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
act.doStuff()
act.actor_ref.stop(block=True)
self.assertFalse(kill_mock.called)
class WatchdogActorTest(testutil.ActorTestMixin, unittest.TestCase):
- @mock.patch('os.kill')
- def test_time_timout(self, kill_mock):
+
+ def test_time_timout(self):
+ kill_mock = mock.Mock('os.kill')
act = BogusActor.start(OSError(errno.ENOENT, ""))
- watch = arvnodeman.baseactor.WatchdogActor.start(1, act)
+ watch = arvnodeman.baseactor.WatchdogActor.start(1, act, killfunc=kill_mock)
+ time.sleep(1)
watch.stop(block=True)
act.stop(block=True)
self.assertTrue(kill_mock.called)
def test_empty_queue_needs_no_servers(self):
servcalc = self.make_calculator([1])
- self.assertEqual([], servcalc.servers_for_queue([]))
+ self.assertEqual(([], {}), servcalc.servers_for_queue([]))
def test_easy_server_count(self):
servcalc = self.make_calculator([1])
- servlist = self.calculate(servcalc, {'min_nodes': 3})
+ servlist, _ = self.calculate(servcalc, {'min_nodes': 3})
self.assertEqual(3, len(servlist))
def test_default_5pct_ram_value_decrease(self):
servcalc = self.make_calculator([1])
- servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
+ servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
self.assertEqual(0, len(servlist))
- servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 121})
+ servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 121})
self.assertEqual(1, len(servlist))
def test_custom_node_mem_scaling_factor(self):
# Simulate a custom 'node_mem_scaling' config parameter by passing
# the value to ServerCalculator
servcalc = self.make_calculator([1], node_mem_scaling=0.5)
- servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
+ servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
self.assertEqual(0, len(servlist))
- servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 64})
+ servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 64})
self.assertEqual(1, len(servlist))
def test_implicit_server_count(self):
servcalc = self.make_calculator([1])
- servlist = self.calculate(servcalc, {}, {'min_nodes': 3})
+ servlist, _ = self.calculate(servcalc, {}, {'min_nodes': 3})
self.assertEqual(4, len(servlist))
def test_bad_min_nodes_override(self):
servcalc = self.make_calculator([1])
- servlist = self.calculate(servcalc,
- {'min_nodes': -2}, {'min_nodes': 'foo'})
+ servlist, _ = self.calculate(servcalc,
+ {'min_nodes': -2}, {'min_nodes': 'foo'})
self.assertEqual(2, len(servlist))
- def test_ignore_unsatisfiable_jobs(self):
+ def test_ignore_and_return_unsatisfiable_jobs(self):
servcalc = self.make_calculator([1], max_nodes=9)
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 2},
- {'min_ram_mb_per_node': 256},
- {'min_nodes': 6},
- {'min_nodes': 12},
- {'min_scratch_mb_per_node': 300000})
+ servlist, u_jobs = self.calculate(servcalc,
+ {'min_cores_per_node': 2},
+ {'min_ram_mb_per_node': 256},
+ {'min_nodes': 6},
+ {'min_nodes': 12},
+ {'min_scratch_mb_per_node': 300000})
self.assertEqual(6, len(servlist))
+ # Only unsatisfiable jobs are returned on u_jobs
+ self.assertIn('zzzzz-jjjjj-000000000000000', u_jobs.keys())
+ self.assertIn('zzzzz-jjjjj-000000000000001', u_jobs.keys())
+ self.assertNotIn('zzzzz-jjjjj-000000000000002', u_jobs.keys())
+ self.assertIn('zzzzz-jjjjj-000000000000003', u_jobs.keys())
+ self.assertIn('zzzzz-jjjjj-000000000000004', u_jobs.keys())
def test_ignore_too_expensive_jobs(self):
servcalc = self.make_calculator([1, 2], max_nodes=12, max_price=6)
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 1, 'min_nodes': 6})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 1, 'min_nodes': 6})
self.assertEqual(6, len(servlist))
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 2, 'min_nodes': 6})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 2, 'min_nodes': 6})
self.assertEqual(0, len(servlist))
def test_job_requesting_max_nodes_accepted(self):
servcalc = self.make_calculator([1], max_nodes=4)
- servlist = self.calculate(servcalc, {'min_nodes': 4})
+ servlist, _ = self.calculate(servcalc, {'min_nodes': 4})
self.assertEqual(4, len(servlist))
def test_cheapest_size(self):
def test_next_biggest(self):
servcalc = self.make_calculator([1, 2, 4, 8])
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 3},
- {'min_cores_per_node': 6})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 3},
+ {'min_cores_per_node': 6})
self.assertEqual([servcalc.cloud_sizes[2].id,
servcalc.cloud_sizes[3].id],
[s.id for s in servlist])
def test_multiple_sizes(self):
servcalc = self.make_calculator([1, 2])
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 2},
- {'min_cores_per_node': 1},
- {'min_cores_per_node': 1})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 2},
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 1})
self.assertEqual([servcalc.cloud_sizes[1].id,
servcalc.cloud_sizes[0].id,
servcalc.cloud_sizes[0].id],
[s.id for s in servlist])
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 1},
- {'min_cores_per_node': 2},
- {'min_cores_per_node': 1})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 2},
+ {'min_cores_per_node': 1})
self.assertEqual([servcalc.cloud_sizes[0].id,
servcalc.cloud_sizes[1].id,
servcalc.cloud_sizes[0].id],
[s.id for s in servlist])
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 1},
- {'min_cores_per_node': 1},
- {'min_cores_per_node': 2})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 2})
self.assertEqual([servcalc.cloud_sizes[0].id,
servcalc.cloud_sizes[0].id,
servcalc.cloud_sizes[1].id],
unittest.TestCase):
TEST_CLASS = jobqueue.JobQueueMonitorActor
+
class MockCalculator(object):
@staticmethod
def servers_for_queue(queue):
- return [testutil.MockSize(n) for n in queue]
+ return ([testutil.MockSize(n) for n in queue], {})
+
+
+ class MockCalculatorUnsatisfiableJobs(object):
+ @staticmethod
+ def servers_for_queue(queue):
+ return ([], {k["uuid"]: "Unsatisfiable job mock" for k in queue})
def build_monitor(self, side_effect, *args, **kwargs):
super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs)
self.client.jobs().queue().execute.side_effect = side_effect
+ @mock.patch("subprocess.check_call")
+ @mock.patch("subprocess.check_output")
+ def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
+ job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
+ container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
+ mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
+
+ self.build_monitor([{'items': [{'uuid': job_uuid}]}],
+ self.MockCalculatorUnsatisfiableJobs(), True, True)
+ self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.monitor.ping().get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
+ self.client.jobs().cancel.assert_called_with(uuid=job_uuid)
+ mock_scancel.assert_called_with(['scancel', '--name='+container_uuid])
+
@mock.patch("subprocess.check_output")
def test_subscribers_get_server_lists(self, mock_squeue):
mock_squeue.return_value = ""
from __future__ import absolute_import, print_function
from future import standard_library
+import json
import requests
import unittest
class TestServer(object):
+ def __init__(self, management_token=None):
+ self.mgmt_token = management_token
+
def __enter__(self):
cfg = config.NodeManagerConfig()
cfg.set('Manage', 'port', '0')
cfg.set('Manage', 'address', '127.0.0.1')
+ if self.mgmt_token != None:
+ cfg.set('Manage', 'ManagementToken', self.mgmt_token)
self.srv = status.Server(cfg)
self.srv.start()
addr, port = self.srv.server_address
def get_status(self):
return self.get_status_response().json()
+ def get_healthcheck_ping(self, auth_header=None):
+ headers = {}
+ if auth_header != None:
+ headers['Authorization'] = auth_header
+ return requests.get(self.srv_base+'/_health/ping', headers=headers)
class StatusServerUpdates(unittest.TestCase):
def test_updates(self):
self.srv.start()
self.assertFalse(self.srv.enabled)
self.assertFalse(getattr(self.srv, '_thread', False))
+
+class HealthcheckPing(unittest.TestCase):
+ def test_ping_disabled(self):
+ with TestServer() as srv:
+ r = srv.get_healthcheck_ping()
+ self.assertEqual(404, r.status_code)
+
+ def test_ping_no_auth(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping()
+ self.assertEqual(401, r.status_code)
+
+ def test_ping_bad_auth_format(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('noBearer')
+ self.assertEqual(403, r.status_code)
+
+ def test_ping_bad_auth_token(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('Bearer badtoken')
+ self.assertEqual(403, r.status_code)
+
+ def test_ping_success(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('Bearer configuredmanagementtoken')
+ self.assertEqual(200, r.status_code)
+ self.assertEqual('application/json', r.headers['content-type'])
+ resp = r.json()
+ self.assertEqual('{"health": "OK"}', json.dumps(resp))
def test_delayed_turnaround(self):
receiver = mock.Mock()
- with mock.patch('time.time', return_value=0) as mock_now:
- deliverer = timedcallback.TimedCallBackActor.start().proxy()
- deliverer.schedule(1, receiver, 'delayed')
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- self.assertFalse(receiver.called)
- mock_now.return_value = 2
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- self.stop_proxy(deliverer)
+ mock_now = mock.Mock()
+ mock_now.return_value = 0
+ deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+ deliverer.schedule(1, receiver, 'delayed')
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.assertFalse(receiver.called)
+ mock_now.return_value = 2
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.stop_proxy(deliverer)
receiver.assert_called_with('delayed')
def test_out_of_order_scheduling(self):
receiver = mock.Mock()
- with mock.patch('time.time', return_value=1.5) as mock_now:
- deliverer = timedcallback.TimedCallBackActor.start().proxy()
- deliverer.schedule(2, receiver, 'second')
- deliverer.schedule(1, receiver, 'first')
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- receiver.assert_called_with('first')
- mock_now.return_value = 2.5
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- self.stop_proxy(deliverer)
+ mock_now = mock.Mock()
+ mock_now.return_value = 1.5
+ deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+ deliverer.schedule(2, receiver, 'second')
+ deliverer.schedule(1, receiver, 'first')
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ receiver.assert_called_with('first')
+ mock_now.return_value = 2.5
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.stop_proxy(deliverer)
receiver.assert_called_with('second')
def test_dead_actors_ignored(self):
if __name__ == '__main__':
unittest.main()
-
pykka.ActorRegistry.stop_all()
def stop_proxy(self, proxy):
- return proxy.actor_ref.stop(timeout=self.TIMEOUT)
+ th = proxy.get_thread().get()
+ t = proxy.actor_ref.stop(timeout=self.TIMEOUT)
+ th.join()
+ return t
def wait_for_assignment(self, proxy, attr_name, unassigned=None,
timeout=TIMEOUT):
if result is not unassigned:
return result
- def busywait(self, f):
+ def busywait(self, f, finalize=None):
n = 0
- while not f() and n < 10:
+ while not f() and n < 20:
time.sleep(.1)
n += 1
+ if finalize is not None:
+ finalize()
self.assertTrue(f())
}
func (ps *pgEventSource) DBHealth() error {
- ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+ defer cancel()
var i int
return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
}
// Receive websocket frames from the client and pass them to
// sess.Receive().
go func() {
+ defer cancel()
buf := make([]byte, 2<<20)
for {
select {
err = errFrameTooBig
}
if err != nil {
- if err != io.EOF {
+ if err != io.EOF && ctx.Err() == nil {
log.WithError(err).Info("read error")
}
- cancel()
return
}
err = sess.Receive(buf)
if err != nil {
log.WithError(err).Error("sess.Receive() failed")
- cancel()
return
}
}
// sess.EventMessage() as needed, and send them to the client
// as websocket frames.
go func() {
+ defer cancel()
for {
var ok bool
var data interface{}
buf, err = sess.EventMessage(e)
if err != nil {
log.WithError(err).Error("EventMessage failed")
- cancel()
- break
+ return
} else if len(buf) == 0 {
log.Debug("skip")
continue
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
- log.WithError(err).Error("write failed")
- cancel()
- break
+ if ctx.Err() == nil {
+ log.WithError(err).Error("write failed")
+ }
+ return
}
log.Debug("sent")
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
+ defer cancel()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
default:
}
}
- continue
case e, ok := <-incoming.Channel():
if !ok {
- cancel()
return
}
if !sess.Filter(e) {
case queue <- e:
default:
log.WithError(errQueueFull).Error("terminate")
- cancel()
return
}
}
"database/sql"
"encoding/json"
"errors"
+ "reflect"
"sync"
"sync/atomic"
"time"
sess.mtx.Unlock()
sub.sendOldEvents(sess)
return nil
+ } else if sub.Method == "unsubscribe" {
+ sess.mtx.Lock()
+ found := false
+ for i, s := range sess.subscriptions {
+ if !reflect.DeepEqual(s.Filters, sub.Filters) {
+ continue
+ }
+ copy(sess.subscriptions[i:], sess.subscriptions[i+1:])
+ sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1]
+ found = true
+ break
+ }
+ sess.mtx.Unlock()
+ sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe")
+ if found {
+ sess.sendq <- v0subscribeOK
+ return nil
+ }
} else {
sess.log.WithField("Method", sub.Method).Info("unknown method")
}
// client will probably reconnect and do the
// same thing all over again.
time.Sleep(100 * time.Millisecond)
+ if sess.ws.Request().Context().Err() != nil {
+ // Session terminated while we were sleeping
+ return
+ }
}
now := time.Now()
e := &event{
conn, r, w := s.testClient()
defer conn.Close()
- c.Check(w.Encode(map[string]interface{}{
- "method": "subscribe",
- "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
- }), check.IsNil)
- s.expectStatus(c, r, 200)
+ cmd := func(method, eventType string, status int) {
+ c.Check(w.Encode(map[string]interface{}{
+ "method": method,
+ "filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
+ }), check.IsNil)
+ s.expectStatus(c, r, status)
+ }
+ cmd("subscribe", "update", 200)
+ cmd("subscribe", "update", 200)
+ cmd("subscribe", "create", 200)
+ cmd("subscribe", "update", 200)
+ cmd("unsubscribe", "blip", 400)
+ cmd("unsubscribe", "create", 200)
+ cmd("unsubscribe", "update", 200)
go s.emitEvents(nil)
lg := s.expectLog(c, r)
c.Check(lg.EventType, check.Equals, "update")
+
+ cmd("unsubscribe", "update", 200)
+ cmd("unsubscribe", "update", 200)
+ cmd("unsubscribe", "update", 400)
}
func (s *v0Suite) TestLastLogID(c *check.C) {
GOPATH=$PWD go get github.com/curoverse/runsvinit && \
install bin/runsvinit /usr/local/bin
+ENV PJSVERSION=1.9.7
+
RUN set -e && \
- PJS=phantomjs-1.9.7-linux-x86_64 && \
- curl -L -o/tmp/$PJS.tar.bz2 http://cache.arvados.org/$PJS.tar.bz2 && \
- tar -C /usr/local -xjf /tmp/$PJS.tar.bz2 && \
- ln -s ../$PJS/bin/phantomjs /usr/local/bin/
+ curl -L -f http://cache.arvados.org/phantomjs-${PJSVERSION}-linux-x86_64.tar.bz2 | tar -C /usr/local -xjf - && \
+ ln -s ../phantomjs-${PJSVERSION}-linux-x86_64/bin/phantomjs /usr/local/bin
RUN pip install -U setuptools
+ENV NODEVERSION v6.11.2
+
+# Install nodejs binary
+RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
+ ln -s ../node-${NODEVERSION}-linux-x64/bin/node ../node-${NODEVERSION}-linux-x64/bin/npm /usr/local/bin
+
ARG arvados_version
RUN echo arvados_version is git commit $arvados_version
description='Summarize resource usage of an Arvados Crunch job')
src = self.add_mutually_exclusive_group()
src.add_argument(
- '--job', type=str, metavar='UUID',
- help='Look up the specified job and read its log data from Keep'
- ' (or from the Arvados event log, if the job is still running)')
+ '--job', '--container', '--container-request',
+ type=str, metavar='UUID',
+ help='Look up the specified job, container, or container request '
+ 'and read its log data from Keep (or from the Arvados event log, '
+ 'if the job is still running)')
src.add_argument(
'--pipeline-instance', type=str, metavar='UUID',
help='Summarize each component of the given pipeline instance')
self.add_argument(
'--format', type=str, choices=('html', 'text'), default='text',
help='Report format')
+ self.add_argument(
+ '--threads', type=int, default=8,
+ help='Maximum worker threads to run')
self.add_argument(
'--verbose', '-v', action='count', default=0,
help='Log more information (once for progress, twice for debug)')
def run(self):
kwargs = {
'skip_child_jobs': self.args.skip_child_jobs,
+ 'threads': self.args.threads,
}
if self.args.pipeline_instance:
- self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance, **kwargs)
+ self.summer = summarizer.NewSummarizer(self.args.pipeline_instance, **kwargs)
elif self.args.job:
- self.summer = summarizer.JobSummarizer(self.args.job, **kwargs)
+ self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
elif self.args.log_file:
if self.args.log_file.endswith('.gz'):
fh = gzip.open(self.args.log_file)
logger.debug('load collection %s', collection_id)
collection = arvados.collection.CollectionReader(collection_id)
filenames = [filename for filename in collection]
- if len(filenames) != 1:
- raise ValueError(
- "collection {} has {} files; need exactly one".format(
- collection_id, len(filenames)))
- self._reader = collection.open(filenames[0])
- self._label = "{}/{}".format(collection_id, filenames[0])
+ if len(filenames) == 1:
+ filename = filenames[0]
+ else:
+ filename = 'crunchstat.txt'
+ self._reader = collection.open(filename)
+ self._label = "{}/{}".format(collection_id, filename)
def __str__(self):
return self._label
class Summarizer(object):
- def __init__(self, logdata, label=None, skip_child_jobs=False):
+ def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
self._logdata = logdata
+ self.uuid = uuid
self.label = label
self.starttime = None
self.finishtime = None
def run(self):
logger.debug("%s: parsing logdata %s", self.label, self._logdata)
+ self.detected_crunch1 = False
for line in self._logdata:
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
- if m:
- seq = int(m.group('seq'))
- uuid = m.group('task_uuid')
- self.seq_to_uuid[seq] = uuid
- logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
- continue
-
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
- if m:
- task_id = self.seq_to_uuid[int(m.group('seq'))]
- elapsed = int(m.group('elapsed'))
- self.task_stats[task_id]['time'] = {'elapsed': elapsed}
- if elapsed > self.stats_max['time']['elapsed']:
- self.stats_max['time']['elapsed'] = elapsed
- continue
-
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
- if m:
- uuid = m.group('uuid')
- if self._skip_child_jobs:
- logger.warning('%s: omitting stats from child job %s'
- ' because --skip-child-jobs flag is on',
- self.label, uuid)
+ if not self.detected_crunch1 and '-8i9sb-' in line:
+ self.detected_crunch1 = True
+
+ if self.detected_crunch1:
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
+ if m:
+ seq = int(m.group('seq'))
+ uuid = m.group('task_uuid')
+ self.seq_to_uuid[seq] = uuid
+ logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
continue
- logger.debug('%s: follow %s', self.label, uuid)
- child_summarizer = JobSummarizer(uuid)
- child_summarizer.stats_max = self.stats_max
- child_summarizer.task_stats = self.task_stats
- child_summarizer.tasks = self.tasks
- child_summarizer.starttime = self.starttime
- child_summarizer.run()
- logger.debug('%s: done %s', self.label, uuid)
- continue
- m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
- if not m:
- continue
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
+ if m:
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
+ elapsed = int(m.group('elapsed'))
+ self.task_stats[task_id]['time'] = {'elapsed': elapsed}
+ if elapsed > self.stats_max['time']['elapsed']:
+ self.stats_max['time']['elapsed'] = elapsed
+ continue
- try:
- if self.label is None:
- self.label = m.group('job_uuid')
- logger.debug('%s: using job uuid as label', self.label)
- if m.group('category').endswith(':'):
- # "stderr crunchstat: notice: ..."
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+ if m:
+ uuid = m.group('uuid')
+ if self._skip_child_jobs:
+ logger.warning('%s: omitting stats from child job %s'
+ ' because --skip-child-jobs flag is on',
+ self.label, uuid)
+ continue
+ logger.debug('%s: follow %s', self.label, uuid)
+ child_summarizer = ProcessSummarizer(uuid)
+ child_summarizer.stats_max = self.stats_max
+ child_summarizer.task_stats = self.task_stats
+ child_summarizer.tasks = self.tasks
+ child_summarizer.starttime = self.starttime
+ child_summarizer.run()
+ logger.debug('%s: done %s', self.label, uuid)
continue
- elif m.group('category') in ('error', 'caught'):
+
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+ if not m:
continue
- elif m.group('category') == 'read':
- # "stderr crunchstat: read /proc/1234/net/dev: ..."
- # (crunchstat formatting fixed, but old logs still say this)
+ else:
+ # crunch2
+ m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+ if not m:
continue
+
+ if self.label is None:
+ try:
+ self.label = m.group('job_uuid')
+ except IndexError:
+ self.label = 'container'
+ if m.group('category').endswith(':'):
+ # "stderr crunchstat: notice: ..."
+ continue
+ elif m.group('category') in ('error', 'caught'):
+ continue
+ elif m.group('category') in ['read', 'open', 'cgroup', 'CID']:
+ # "stderr crunchstat: read /proc/1234/net/dev: ..."
+ # (old logs are less careful with unprefixed error messages)
+ continue
+
+ if self.detected_crunch1:
task_id = self.seq_to_uuid[int(m.group('seq'))]
- task = self.tasks[task_id]
+ else:
+ task_id = 'container'
+ task = self.tasks[task_id]
- # Use the first and last crunchstat timestamps as
- # approximations of starttime and finishtime.
+ # Use the first and last crunchstat timestamps as
+ # approximations of starttime and finishtime.
+ timestamp = m.group('timestamp')
+ if timestamp[10:11] == '_':
timestamp = datetime.datetime.strptime(
- m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
- if not task.starttime:
- task.starttime = timestamp
- logger.debug('%s: task %s starttime %s',
- self.label, task_id, timestamp)
- task.finishtime = timestamp
-
- if not self.starttime:
- self.starttime = timestamp
- self.finishtime = timestamp
-
- this_interval_s = None
- for group in ['current', 'interval']:
- if not m.group(group):
- continue
- category = m.group('category')
- words = m.group(group).split(' ')
- stats = {}
+ timestamp, '%Y-%m-%d_%H:%M:%S')
+ elif timestamp[10:11] == 'T':
+ timestamp = datetime.datetime.strptime(
+ timestamp[:19], '%Y-%m-%dT%H:%M:%S')
+ else:
+ raise ValueError("Cannot parse timestamp {!r}".format(
+ timestamp))
+
+ if not task.starttime:
+ task.starttime = timestamp
+ logger.debug('%s: task %s starttime %s',
+ self.label, task_id, timestamp)
+ task.finishtime = timestamp
+
+ if not self.starttime:
+ self.starttime = timestamp
+ self.finishtime = timestamp
+
+ this_interval_s = None
+ for group in ['current', 'interval']:
+ if not m.group(group):
+ continue
+ category = m.group('category')
+ words = m.group(group).split(' ')
+ stats = {}
+ try:
for val, stat in zip(words[::2], words[1::2]):
- try:
- if '.' in val:
- stats[stat] = float(val)
- else:
- stats[stat] = int(val)
- except ValueError as e:
- raise ValueError(
- 'Error parsing {} stat: {!r}'.format(
- stat, e))
- if 'user' in stats or 'sys' in stats:
- stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
- if 'tx' in stats or 'rx' in stats:
- stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
- for stat, val in stats.iteritems():
- if group == 'interval':
- if stat == 'seconds':
- this_interval_s = val
- continue
- elif not (this_interval_s > 0):
- logger.error(
- "BUG? interval stat given with duration {!r}".
- format(this_interval_s))
- continue
- else:
- stat = stat + '__rate'
- val = val / this_interval_s
- if stat in ['user+sys__rate', 'tx+rx__rate']:
- task.series[category, stat].append(
- (timestamp - self.starttime, val))
+ if '.' in val:
+ stats[stat] = float(val)
+ else:
+ stats[stat] = int(val)
+ except ValueError as e:
+ logger.warning('Error parsing {} stat: {!r}'.format(
+ stat, e))
+ continue
+ if 'user' in stats or 'sys' in stats:
+ stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
+ if 'tx' in stats or 'rx' in stats:
+ stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
+ for stat, val in stats.iteritems():
+ if group == 'interval':
+ if stat == 'seconds':
+ this_interval_s = val
+ continue
+ elif not (this_interval_s > 0):
+ logger.error(
+ "BUG? interval stat given with duration {!r}".
+ format(this_interval_s))
+ continue
else:
- if stat in ['rss']:
+ stat = stat + '__rate'
+ val = val / this_interval_s
+ if stat in ['user+sys__rate', 'tx+rx__rate']:
task.series[category, stat].append(
(timestamp - self.starttime, val))
- self.task_stats[task_id][category][stat] = val
- if val > self.stats_max[category][stat]:
- self.stats_max[category][stat] = val
- except Exception as e:
- logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
+ else:
+ if stat in ['rss']:
+ task.series[category, stat].append(
+ (timestamp - self.starttime, val))
+ self.task_stats[task_id][category][stat] = val
+ if val > self.stats_max[category][stat]:
+ self.stats_max[category][stat] = val
logger.debug('%s: done parsing', self.label)
self.job_tot = collections.defaultdict(
def _recommend_cpu(self):
"""Recommend asking for 4 cores if max CPU usage was 333%"""
+ constraint_key = self._map_runtime_constraint('vcpus')
cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
if cpu_max_rate == float('-Inf'):
logger.warning('%s: no CPU usage data', self.label)
return
used_cores = max(1, int(math.ceil(cpu_max_rate)))
- asked_cores = self.existing_constraints.get('min_cores_per_node')
+ asked_cores = self.existing_constraints.get(constraint_key)
if asked_cores is None or used_cores < asked_cores:
yield (
'#!! {} max CPU usage was {}% -- '
- 'try runtime_constraints "min_cores_per_node":{}'
+ 'try runtime_constraints "{}":{}'
).format(
self.label,
int(math.ceil(cpu_max_rate*100)),
+ constraint_key,
int(used_cores))
def _recommend_ram(self):
the memory we want -- even if that happens to be 8192 MiB.
"""
+ constraint_key = self._map_runtime_constraint('ram')
used_bytes = self.stats_max['mem']['rss']
if used_bytes == float('-Inf'):
logger.warning('%s: no memory usage data', self.label)
return
used_mib = math.ceil(float(used_bytes) / 1048576)
- asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
+ asked_mib = self.existing_constraints.get(constraint_key)
nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
if asked_mib is None or (
math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
yield (
'#!! {} max RSS was {} MiB -- '
- 'try runtime_constraints "min_ram_mb_per_node":{}'
+ 'try runtime_constraints "{}":{}'
).format(
self.label,
int(used_mib),
- int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
+ constraint_key,
+ int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
def _recommend_keep_cache(self):
"""Recommend increasing keep cache if utilization < 80%"""
+ constraint_key = self._map_runtime_constraint('keep_cache_ram')
if self.job_tot['net:keep0']['rx'] == 0:
return
utilization = (float(self.job_tot['blkio:0:0']['read']) /
float(self.job_tot['net:keep0']['rx']))
- asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
+ asked_mib = self.existing_constraints.get(constraint_key, 256)
if utilization < 0.8:
yield (
'#!! {} Keep cache utilization was {:.2f}% -- '
- 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
+ 'try runtime_constraints "{}":{} (or more)'
).format(
self.label,
utilization * 100.0,
- asked_mib*2)
+ constraint_key,
+ asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
def _format(self, val):
else:
return '{}'.format(val)
+ def _runtime_constraint_mem_unit(self):
+ if hasattr(self, 'runtime_constraint_mem_unit'):
+ return self.runtime_constraint_mem_unit
+ elif self.detected_crunch1:
+ return JobSummarizer.runtime_constraint_mem_unit
+ else:
+ return ContainerSummarizer.runtime_constraint_mem_unit
+
+ def _map_runtime_constraint(self, key):
+ if hasattr(self, 'map_runtime_constraint'):
+ return self.map_runtime_constraint[key]
+ elif self.detected_crunch1:
+ return JobSummarizer.map_runtime_constraint[key]
+ else:
+ return key
+
class CollectionSummarizer(Summarizer):
def __init__(self, collection_id, **kwargs):
self.label = collection_id
-class JobSummarizer(Summarizer):
- def __init__(self, job, **kwargs):
- arv = arvados.api('v1')
- if isinstance(job, basestring):
- self.job = arv.jobs().get(uuid=job).execute()
- else:
- self.job = job
+def NewSummarizer(process_or_uuid, **kwargs):
+ """Construct with the appropriate subclass for this uuid/object."""
+
+ if isinstance(process_or_uuid, dict):
+ process = process_or_uuid
+ uuid = process['uuid']
+ else:
+ uuid = process_or_uuid
+ process = None
+ arv = arvados.api('v1', model=OrderedJsonModel())
+
+ if '-dz642-' in uuid:
+ if process is None:
+ process = arv.containers().get(uuid=uuid).execute()
+ klass = ContainerTreeSummarizer
+ elif '-xvhdp-' in uuid:
+ if process is None:
+ process = arv.container_requests().get(uuid=uuid).execute()
+ klass = ContainerTreeSummarizer
+ elif '-8i9sb-' in uuid:
+ if process is None:
+ process = arv.jobs().get(uuid=uuid).execute()
+ klass = JobSummarizer
+ elif '-d1hrv-' in uuid:
+ if process is None:
+ process = arv.pipeline_instances().get(uuid=uuid).execute()
+ klass = PipelineSummarizer
+ elif '-4zz18-' in uuid:
+ return CollectionSummarizer(collection_id=uuid)
+ else:
+ raise ArgumentError("Unrecognized uuid %s", uuid)
+ return klass(process, uuid=uuid, **kwargs)
+
+
+class ProcessSummarizer(Summarizer):
+ """Process is a job, pipeline, container, or container request."""
+
+ def __init__(self, process, label=None, **kwargs):
rdr = None
- if self.job.get('log'):
+ self.process = process
+ if label is None:
+ label = self.process.get('name', self.process['uuid'])
+ if self.process.get('log'):
try:
- rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
+ rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
except arvados.errors.NotFoundError as e:
logger.warning("Trying event logs after failing to read "
- "log collection %s: %s", self.job['log'], e)
- else:
- label = self.job['uuid']
+ "log collection %s: %s", self.process['log'], e)
if rdr is None:
- rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
- label = self.job['uuid'] + ' (partial)'
- super(JobSummarizer, self).__init__(rdr, **kwargs)
- self.label = label
- self.existing_constraints = self.job.get('runtime_constraints', {})
+ rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
+ label = label + ' (partial)'
+ super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
+ self.existing_constraints = self.process.get('runtime_constraints', {})
-class PipelineSummarizer(object):
- def __init__(self, pipeline_instance_uuid, **kwargs):
- arv = arvados.api('v1', model=OrderedJsonModel())
- instance = arv.pipeline_instances().get(
- uuid=pipeline_instance_uuid).execute()
- self.summarizers = collections.OrderedDict()
- for cname, component in instance['components'].iteritems():
- if 'job' not in component:
- logger.warning(
- "%s: skipping component with no job assigned", cname)
- else:
- logger.info(
- "%s: job %s", cname, component['job']['uuid'])
- summarizer = JobSummarizer(component['job'], **kwargs)
- summarizer.label = '{} {}'.format(
- cname, component['job']['uuid'])
- self.summarizers[cname] = summarizer
- self.label = pipeline_instance_uuid
+class JobSummarizer(ProcessSummarizer):
+ runtime_constraint_mem_unit = 1048576
+ map_runtime_constraint = {
+ 'keep_cache_ram': 'keep_cache_mb_per_task',
+ 'ram': 'min_ram_mb_per_node',
+ 'vcpus': 'min_cores_per_node',
+ }
+
+
+class ContainerSummarizer(ProcessSummarizer):
+ runtime_constraint_mem_unit = 1
+
+
+class MultiSummarizer(object):
+ def __init__(self, children={}, label=None, threads=1, **kwargs):
+ self.throttle = threading.Semaphore(threads)
+ self.children = children
+ self.label = label
+
+ def run_and_release(self, target, *args, **kwargs):
+ try:
+ return target(*args, **kwargs)
+ finally:
+ self.throttle.release()
def run(self):
threads = []
- for summarizer in self.summarizers.itervalues():
- t = threading.Thread(target=summarizer.run)
+ for child in self.children.itervalues():
+ self.throttle.acquire()
+ t = threading.Thread(target=self.run_and_release, args=(child.run, ))
t.daemon = True
t.start()
threads.append(t)
def text_report(self):
txt = ''
- for cname, summarizer in self.summarizers.iteritems():
- txt += '### Summary for {} ({})\n'.format(
- cname, summarizer.job['uuid'])
- txt += summarizer.text_report()
+ for cname, child in self.children.iteritems():
+ if len(self.children) > 1:
+ txt += '### Summary for {} ({})\n'.format(
+ cname, child.process['uuid'])
+ txt += child.text_report()
txt += '\n'
return txt
def html_report(self):
- return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html()
+ return WEBCHART_CLASS(self.label, self.children.itervalues()).html()
+
+
+class PipelineSummarizer(MultiSummarizer):
+ def __init__(self, instance, **kwargs):
+ children = collections.OrderedDict()
+ for cname, component in instance['components'].iteritems():
+ if 'job' not in component:
+ logger.warning(
+ "%s: skipping component with no job assigned", cname)
+ else:
+ logger.info(
+ "%s: job %s", cname, component['job']['uuid'])
+ summarizer = JobSummarizer(component['job'], **kwargs)
+ summarizer.label = '{} {}'.format(
+ cname, component['job']['uuid'])
+ children[cname] = summarizer
+ super(PipelineSummarizer, self).__init__(
+ children=children,
+ label=instance['uuid'],
+ **kwargs)
+
+
+class ContainerTreeSummarizer(MultiSummarizer):
+ def __init__(self, root, **kwargs):
+ arv = arvados.api('v1', model=OrderedJsonModel())
+
+ label = kwargs.pop('label', None) or root.get('name') or root['uuid']
+ root['name'] = label
+
+ children = collections.OrderedDict()
+ todo = collections.deque((root, ))
+ while len(todo) > 0:
+ current = todo.popleft()
+ label = current['name']
+ sort_key = current['created_at']
+ if current['uuid'].find('-xvhdp-') > 0:
+ current = arv.containers().get(uuid=current['container_uuid']).execute()
+
+ summer = ContainerSummarizer(current, label=label, **kwargs)
+ summer.sort_key = sort_key
+ children[current['uuid']] = summer
+
+ page_filters = []
+ while True:
+ items = arv.container_requests().index(
+ order=['uuid asc'],
+ filters=page_filters+[
+ ['requesting_container_uuid', '=', current['uuid']]],
+ ).execute()['items']
+ if not items:
+ break
+ page_filters = [['uuid', '>', items[-1]['uuid']]]
+ for cr in items:
+ if cr['container_uuid']:
+ logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
+ cr['name'] = cr.get('name') or cr['uuid']
+ todo.append(cr)
+ sorted_children = collections.OrderedDict()
+ for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
+ sorted_children[uuid] = children[uuid]
+ super(ContainerTreeSummarizer, self).__init__(
+ children=sorted_children,
+ label=root['name'],
+ **kwargs)
--- /dev/null
+category metric task_max task_max_rate job_total
+cpu cpus 20 - -
+cpu sys 0.82 0.08 0.82
+cpu user 2.31 0.22 2.31
+cpu user+sys 3.13 0.30 3.13
+mem cache 23846912 - -
+mem pgmajfault 121 - 121
+mem rss 65470464 - -
+mem swap 0 - -
+net:eth0 rx 500762 951.15 500762
+net:eth0 tx 36242 226.61 36242
+net:eth0 tx+rx 537004 1177.76 537004
+# Number of tasks: 1
+# Max CPU time spent by a single task: 3.13s
+# Max CPU usage in a single interval: 29.89%
+# Overall CPU usage: 0%
+# Max memory used by a single task: 0.07GB
+# Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! container max CPU usage was 30% -- try runtime_constraints "vcpus":1
+#!! container max RSS was 63 MiB -- try runtime_constraints "ram":1020054732
s.run()
+class SummarizeContainer(ReportDiff):
+ fake_container = {
+ 'uuid': '9tee4-dz642-mjfb0i5hzojp16a',
+ 'created_at': '2017-08-18T14:27:25.371388141',
+ 'log': '9tee4-4zz18-ihyzym9tcwjwg4r',
+ }
+ fake_request = {
+ 'uuid': '9tee4-xvhdp-uper95jktm10d3w',
+ 'name': 'container',
+ 'created_at': '2017-08-18T14:27:25.242339223Z',
+ 'container_uuid': fake_container['uuid'],
+ }
+ logfile = os.path.join(
+ TESTS_DIR, 'container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz')
+
+ @mock.patch('arvados.collection.CollectionReader')
+ @mock.patch('arvados.api')
+ def test_container(self, mock_api, mock_cr):
+ mock_api().container_requests().index().execute.return_value = {'items':[]}
+ mock_api().container_requests().get().execute.return_value = self.fake_request
+ mock_api().containers().get().execute.return_value = self.fake_container
+ mock_cr().__iter__.return_value = [
+ 'crunch-run.txt', 'stderr.txt', 'node-info.txt',
+ 'container.json', 'crunchstat.txt']
+ mock_cr().open.return_value = gzip.open(self.logfile)
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--job', self.fake_request['uuid']])
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+ self.diff_known_report(self.logfile, cmd)
+
+
class SummarizeJob(ReportDiff):
fake_job_uuid = '4xphq-8i9sb-jq0ekny1xou3zoh'
fake_log_id = 'fake-log-collection-id'
"ignore": "test",
"package": [
{
- "checksumSHA1": "b68aaMZImS90FjnReAxpbp20FGA=",
+ "checksumSHA1": "jf7K+UTQNIzRdlG5F4zX/8b++/E=",
"origin": "github.com/curoverse/goamz/aws",
"path": "github.com/AdRoll/goamz/aws",
- "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
+ "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+ "revisionTime": "2017-07-27T13:52:37Z"
},
{
- "checksumSHA1": "ey9ddXTW9dncjJz/COKpeYm+sgg=",
+ "checksumSHA1": "9nUwQXI+pNxZo6bnR7NslpMpfPI=",
"origin": "github.com/curoverse/goamz/s3",
"path": "github.com/AdRoll/goamz/s3",
- "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
+ "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+ "revisionTime": "2017-07-27T13:52:37Z"
},
{
- "checksumSHA1": "pDHYVqUQtRsPYw/X4kUrdK7pxMs=",
+ "checksumSHA1": "tvxbsTkdjB0C/uxEglqD6JfVnMg=",
"origin": "github.com/curoverse/goamz/s3/s3test",
"path": "github.com/AdRoll/goamz/s3/s3test",
- "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
+ "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+ "revisionTime": "2017-07-27T13:52:37Z"
},
{
"checksumSHA1": "Rjy2uYZkQ8Kjht6ZFU0qzm2I/kI=",
"revisionTime": "2017-03-24T20:46:54Z"
},
{
- "checksumSHA1": "Gk3jTNQ5uGDUE0WMJFWcYz9PMps=",
+ "checksumSHA1": "q5SZBWFVC3wOIzftf+l/h5WLG1k=",
"path": "github.com/lib/pq/oid",
"revision": "2704adc878c21e1329f46f6e56a1c387d788ff94",
"revisionTime": "2017-03-24T20:46:54Z"
"revisionTime": "2017-05-12T22:20:15Z"
},
{
- "checksumSHA1": "ENl6I8+3AaBanbn9CVExMjDTHPc=",
+ "checksumSHA1": "dUfdXzRJupI9VpqNR2LlppeZvLc=",
"origin": "github.com/docker/docker/vendor/golang.org/x/sys/unix",
"path": "golang.org/x/sys/unix",
"revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",