c = Collection.find(re[1])
input_obj[param_id] = {"class" => primary_type,
"location" => "keep:#{c.portable_data_hash}#{re[4]}",
- "arv:collection" => input_obj[param_id]}
+ "http://arvados.org/cwl#collectionUUID" => re[1]}
end
end
end
chooser_title = "Choose a #{primary_type == 'Directory' ? 'dataset' : 'file'}:"
selection_param = object.class.to_s.underscore + dn
if attrvalue.is_a? Hash
- display_value = attrvalue[:"arv:collection"] || attrvalue[:location]
+ display_value = attrvalue[:"http://arvados.org/cwl#collectionUUID"] || attrvalue[:"arv:collection"] || attrvalue[:location]
re = CollectionsHelper.match_uuid_with_optional_filepath(display_value)
+ locationre = CollectionsHelper.match(attrvalue[:location][5..-1])
if re
- if re[4]
- display_value = "#{Collection.find(re[1]).name} / #{re[4][1..-1]}"
+ if locationre and locationre[4]
+ display_value = "#{Collection.find(re[1]).name} / #{locationre[4][1..-1]}"
else
display_value = Collection.find(re[1]).name
end
render_runtime duration, use_words, round_to_min
end
- # Keep locators are expected to be of the form \"...<pdh/file_path>\"
- JSON_KEEP_LOCATOR_REGEXP = /([0-9a-f]{32}\+\d+[^'"]*?)(?=['"]|\z|$)/
+ # Keep locators are expected to be of the form \"...<pdh/file_path>\" or \"...<uuid/file_path>\"
+ JSON_KEEP_LOCATOR_REGEXP = /([0-9a-f]{32}\+\d+[^'"]*|[a-z0-9]{5}-4zz18-[a-z0-9]{15}[^'"]*)(?=['"]|\z|$)/
def keep_locator_in_json str
# Return a list of all matches
str.scan(JSON_KEEP_LOCATOR_REGEXP).flatten
FROM debian:jessie
MAINTAINER Ward Vandewege <ward@curoverse.com>
+RUN perl -ni~ -e 'print unless /jessie-updates/' /etc/apt/sources.list
+
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies.
FROM debian:8
MAINTAINER Ward Vandewege <wvandewege@veritasgenetics.com>
+RUN perl -ni~ -e 'print unless /jessie-updates/' /etc/apt/sources.list
+
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies
fi
}
-title () {
- txt="********** $1 **********"
- printf "\n%*s%s\n\n" $((($COLUMNS-${#txt})/2)) "" "$txt"
+title() {
+ printf '%s %s\n' "=======" "$1"
}
checkexit() {
if [[ "$1" != "0" ]]; then
- title "!!!!!! $2 FAILED !!!!!!"
+ title "$2 -- FAILED"
failures+=("$2 (`timer`)")
else
successes+=("$2 (`timer`)")
if [[ ${#failures[@]} == 0 ]]
then
- echo "All test suites passed."
+ if [[ ${#successes[@]} != 0 ]]; then
+ echo "All test suites passed."
+ fi
else
echo "Failures (${#failures[@]}):"
for x in "${failures[@]}"
Options:
--skip FOO Do not test the FOO component.
+--skip sanity Skip initial dev environment sanity checks.
+--skip install Do not run any install steps. Just run tests.
+ You should provide GOPATH, GEMHOME, and VENVDIR options
+ from a previous invocation if you use this option.
--only FOO Do not test anything except the FOO component.
--temp DIR Install components and dependencies under DIR instead of
making a new temporary directory. Implies --leave-temp.
subsequent invocations.
--repeat N Repeat each install/test step until it succeeds N times.
--retry Prompt to retry if an install or test suite fails.
---skip-install Do not run any install steps. Just run tests.
- You should provide GOPATH, GEMHOME, and VENVDIR options
- from a previous invocation if you use this option.
--only-install Run specific install step
--short Skip (or scale down) some slow tests.
+--interactive Set up, then prompt for test/install steps to perform.
WORKSPACE=path Arvados source tree to test.
CONFIGSRC=path Dir with api server config files to copy into source tree.
(If none given, leave config files alone in source tree.)
envvar=value Set \$envvar to value. Primarily useful for WORKSPACE,
*_test, and other examples shown above.
-Assuming --skip-install is not given, all components are installed
+Assuming "--skip install" is not given, all components are installed
into \$GOPATH, \$VENDIR, and \$GEMHOME before running any tests. Many
test suites depend on other components being installed, and installing
everything tends to be quicker than debugging dependencies.
lib/dispatchcloud/scheduler
lib/dispatchcloud/ssh_executor
lib/dispatchcloud/worker
+lib/service
services/api
services/arv-git-httpd
services/crunchstat
sdk/java-v2
tools/sync-groups
tools/crunchstat-summary
+tools/crunchstat-summary:py3
tools/keep-exercise
tools/keep-rsync
tools/keep-block-check
exit_cleanly() {
trap - INT
- create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+ if which create-plot-data-from-log.sh >/dev/null; then
+ create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+ fi
rotate_logfile "$WORKSPACE/apps/workbench/log/" "test.log"
stop_services
rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
}
sanity_checks() {
+ [[ -n "${skip[sanity]}" ]] && return 0
( [[ -n "$WORKSPACE" ]] && [[ -d "$WORKSPACE/services" ]] ) \
|| fatal "WORKSPACE environment variable not set to a source directory (see: $0 --help)"
echo Checking dependencies:
--short)
short=1
;;
+ --interactive)
+ interactive=1
+ ;;
--skip-install)
- only_install=nothing
+ skip[install]=1
;;
--only-install)
only_install="$1"; shift
fi
start_services() {
+ if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
+ return 0
+ fi
+ . "$VENVDIR/bin/activate"
echo 'Starting API, keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
if [[ ! -d "$WORKSPACE/services/api/log" ]]; then
mkdir -p "$WORKSPACE/services/api/log"
if [[ -f "$WORKSPACE/tmp/api.pid" && ! -s "$WORKSPACE/tmp/api.pid" ]]; then
rm -f "$WORKSPACE/tmp/api.pid"
fi
+ all_services_stopped=
+ fail=0
cd "$WORKSPACE" \
- && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo fail=1) \
+ && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo "fail=1; false") \
&& export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
&& export ARVADOS_TEST_API_INSTALLED="$$" \
&& python sdk/python/tests/run_test_server.py start_controller \
&& python sdk/python/tests/run_test_server.py start_keep-web \
&& python sdk/python/tests/run_test_server.py start_arv-git-httpd \
&& python sdk/python/tests/run_test_server.py start_ws \
- && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo fail=1) \
- && (env | egrep ^ARVADOS)
- if [[ -n "$fail" ]]; then
- return 1
+ && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo "fail=1; false") \
+ && (env | egrep ^ARVADOS) \
+ || fail=1
+ deactivate
+ if [[ $fail != 0 ]]; then
+ unset ARVADOS_TEST_API_HOST
fi
+ return $fail
}
stop_services() {
- if [[ -z "$ARVADOS_TEST_API_HOST" ]]; then
+ if [[ -n "$all_services_stopped" ]]; then
return
fi
unset ARVADOS_TEST_API_HOST
+ . "$VENVDIR/bin/activate" || return
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py stop_nginx \
&& python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
&& python sdk/python/tests/run_test_server.py stop_keep-web \
&& python sdk/python/tests/run_test_server.py stop_keep_proxy \
&& python sdk/python/tests/run_test_server.py stop_controller \
- && python sdk/python/tests/run_test_server.py stop
+ && python sdk/python/tests/run_test_server.py stop \
+ && all_services_stopped=1
+ deactivate
}
interrupt() {
}
trap interrupt INT
-sanity_checks
-
-echo "WORKSPACE=$WORKSPACE"
-
-if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
- # Jenkins expects us to use this by default.
- CONFIGSRC="$HOME/arvados-api-server"
-fi
-
-# Clean up .pyc files that may exist in the workspace
-cd "$WORKSPACE"
-find -name '*.pyc' -delete
-
-if [[ -z "$temp" ]]; then
- temp="$(mktemp -d)"
-fi
-
-# Set up temporary install dirs (unless existing dirs were supplied)
-for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
-do
- if [[ -z "${!tmpdir}" ]]; then
- eval "$tmpdir"="$temp/$tmpdir"
- fi
- if ! [[ -d "${!tmpdir}" ]]; then
- mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
- fi
-done
-
-rm -vf "${WORKSPACE}/tmp/*.log"
-
setup_ruby_environment() {
if [[ -s "$HOME/.rvm/scripts/rvm" ]] ; then
source "$HOME/.rvm/scripts/rvm"
"$venvdest/bin/pip" install --no-cache-dir 'mock>=1.0' 'pbr<1.7.0'
}
-export PERLINSTALLBASE
-export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
+initialize() {
+ sanity_checks
-export R_LIBS
+ echo "WORKSPACE=$WORKSPACE"
-export GOPATH
-(
- set -e
- mkdir -p "$GOPATH/src/git.curoverse.com"
- if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
- for d in \
- "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
- "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
- "$GOPATH/src/git.curoverse.com/arvados.git"; do
- [[ -d "$d" ]] && rmdir "$d"
- done
+ if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
+ # Jenkins expects us to use this by default.
+ CONFIGSRC="$HOME/arvados-api-server"
fi
- for d in \
- "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
- "$GOPATH/src/git.curoverse.com/arvados.git"; do
- [[ -h "$d" ]] && rm "$d"
- done
- ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
- go get -v github.com/kardianos/govendor
- cd "$GOPATH/src/git.curoverse.com/arvados.git"
- if [[ -n "$short" ]]; then
- go get -v -d ...
- "$GOPATH/bin/govendor" sync
- else
- # Remove cached source dirs in workdir. Otherwise, they will
- # not qualify as +missing or +external below, and we won't be
- # able to detect that they're missing from vendor/vendor.json.
- rm -rf vendor/*/
- go get -v -d ...
- "$GOPATH/bin/govendor" sync
- [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
- || fatal "vendor/vendor.json has unused or missing dependencies -- try:
-(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
+ # Clean up .pyc files that may exist in the workspace
+ cd "$WORKSPACE"
+ find -name '*.pyc' -delete
-";
+ if [[ -z "$temp" ]]; then
+ temp="$(mktemp -d)"
fi
-) || fatal "Go setup failed"
-setup_virtualenv "$VENVDIR" --python python2.7
-. "$VENVDIR/bin/activate"
+ # Set up temporary install dirs (unless existing dirs were supplied)
+ for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
+ do
+ if [[ -z "${!tmpdir}" ]]; then
+ eval "$tmpdir"="$temp/$tmpdir"
+ fi
+ if ! [[ -d "${!tmpdir}" ]]; then
+ mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
+ fi
+ done
-# Needed for run_test_server.py which is used by certain (non-Python) tests.
-pip install --no-cache-dir PyYAML \
- || fatal "pip install PyYAML failed"
+ rm -vf "${WORKSPACE}/tmp/*.log"
-# Preinstall libcloud if using a fork; otherwise nodemanager "pip
-# install" won't pick it up by default.
-if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
- pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
- || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
- || fatal "pip install apache-libcloud failed"
-fi
+ export PERLINSTALLBASE
+ export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
-# Deactivate Python 2 virtualenv
-deactivate
+ export R_LIBS
-declare -a pythonstuff
-pythonstuff=(
- sdk/pam
- sdk/python
- sdk/python:py3
- sdk/cwl
- sdk/cwl:py3
- services/dockercleaner:py3
- services/fuse
- services/nodemanager
- tools/crunchstat-summary
- )
+ export GOPATH
-# If Python 3 is available, set up its virtualenv in $VENV3DIR.
-# Otherwise, skip dependent tests.
-PYTHON3=$(which python3)
-if [[ ${?} = 0 ]]; then
- setup_virtualenv "$VENV3DIR" --python python3
-else
- PYTHON3=
- cat >&2 <<EOF
+ # Jenkins config requires that glob tmp/*.log match something. Ensure
+ # that happens even if we don't end up running services that set up
+ # logging.
+ mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
+ touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
-Warning: python3 could not be found. Python 3 tests will be skipped.
+ unset http_proxy https_proxy no_proxy
-EOF
-fi
-# Reactivate Python 2 virtualenv
-. "$VENVDIR/bin/activate"
+ # Note: this must be the last time we change PATH, otherwise rvm will
+ # whine a lot.
+ setup_ruby_environment
+
+ echo "PATH is $PATH"
+}
+
+install_env() {
+ (
+ set -e
+ mkdir -p "$GOPATH/src/git.curoverse.com"
+ if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
+ for d in \
+ "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
+ "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
+ "$GOPATH/src/git.curoverse.com/arvados.git"; do
+ [[ -d "$d" ]] && rmdir "$d"
+ done
+ fi
+ for d in \
+ "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
+ "$GOPATH/src/git.curoverse.com/arvados.git"; do
+ [[ -h "$d" ]] && rm "$d"
+ done
+ ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+ go get -v github.com/kardianos/govendor
+ cd "$GOPATH/src/git.curoverse.com/arvados.git"
+ if [[ -n "$short" ]]; then
+ go get -v -d ...
+ "$GOPATH/bin/govendor" sync
+ else
+ # Remove cached source dirs in workdir. Otherwise, they will
+ # not qualify as +missing or +external below, and we won't be
+ # able to detect that they're missing from vendor/vendor.json.
+ rm -rf vendor/*/
+ go get -v -d ...
+ "$GOPATH/bin/govendor" sync
+ [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
+ || fatal "vendor/vendor.json has unused or missing dependencies -- try:
+
+(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
-# Note: this must be the last time we change PATH, otherwise rvm will
-# whine a lot.
-setup_ruby_environment
+";
+ fi
+ ) || fatal "Go setup failed"
-echo "PATH is $PATH"
+ setup_virtualenv "$VENVDIR" --python python2.7
+ . "$VENVDIR/bin/activate"
-if ! which bundler >/dev/null
-then
- gem install --user-install bundler || fatal 'Could not install bundler'
-fi
+ # Needed for run_test_server.py which is used by certain (non-Python) tests.
+ pip install --no-cache-dir PyYAML \
+ || fatal "pip install PyYAML failed"
+
+ # Preinstall libcloud if using a fork; otherwise nodemanager "pip
+ # install" won't pick it up by default.
+ if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
+ pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
+ || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
+ || fatal "pip install apache-libcloud failed"
+ fi
-# Jenkins config requires that glob tmp/*.log match something. Ensure
-# that happens even if we don't end up running services that set up
-# logging.
-mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
-touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
+ # Deactivate Python 2 virtualenv
+ deactivate
+
+ # If Python 3 is available, set up its virtualenv in $VENV3DIR.
+ # Otherwise, skip dependent tests.
+ PYTHON3=$(which python3)
+ if [[ ${?} = 0 ]]; then
+ setup_virtualenv "$VENV3DIR" --python python3
+ else
+ PYTHON3=
+ cat >&2 <<EOF
+
+Warning: python3 could not be found. Python 3 tests will be skipped.
+
+EOF
+ fi
+
+ if ! which bundler >/dev/null
+ then
+ gem install --user-install bundler || fatal 'Could not install bundler'
+ fi
+}
retry() {
remain="${repeat}"
if ${@}; then
if [[ "$remain" -gt 1 ]]; then
remain=$((${remain}-1))
- title "Repeating ${remain} more times"
+ title "(repeating ${remain} more times)"
else
break
fi
suite="${1}"
;;
esac
- if [[ -z "${skip[$suite]}" && -z "${skip[$1]}" && \
- (${#only[@]} -eq 0 || ${only[$suite]} -eq 1 || \
- ${only[$1]} -eq 1) ||
- ${only[$2]} -eq 1 ]]; then
- retry do_test_once ${@}
- else
- title "Skipping ${1} tests"
+ if [[ -n "${skip[$suite]}" || \
+ -n "${skip[$1]}" || \
+ (${#only[@]} -ne 0 && ${only[$suite]} -eq 0 && ${only[$1]} -eq 0) ]]; then
+ return 0
fi
+ case "${1}" in
+ services/api)
+ stop_services
+ ;;
+ doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
+ # don't care whether services are running
+ ;;
+ *)
+ if ! start_services; then
+ title "test $1 -- failed to start services"
+ return 1
+ fi
+ ;;
+ esac
+ retry do_test_once ${@}
}
do_test_once() {
unset result
- title "Running $1 tests"
+ title "test $1"
timer_reset
- if [[ "$2" == "go" ]]
+
+ if which deactivate >/dev/null; then deactivate; fi
+ if ! . "$VENVDIR/bin/activate"
+ then
+ result=1
+ elif [[ "$2" == "go" ]]
then
covername="coverage-$(echo "$1" | sed -e 's/\//_/g')"
coverflags=("-covermode=count" "-coverprofile=$WORKSPACE/tmp/.$covername.tmp")
fi
result=${result:-$?}
checkexit $result "$1 tests"
- title "End of $1 tests (`timer`)"
+ title "test $1 -- `timer`"
return $result
}
do_install() {
- skipit=false
-
- if [[ -z "${only_install}" || "${only_install}" == "${1}" || "${only_install}" == "${2}" ]]; then
- retry do_install_once ${@}
- else
- skipit=true
- fi
-
- if [[ "$skipit" = true ]]; then
- title "Skipping $1 install"
- fi
+ if [[ -n "${skip[install]}" || ( -n "${only_install}" && "${only_install}" != "${1}" && "${only_install}" != "${2}" ) ]]; then
+ return 0
+ fi
+ retry do_install_once ${@}
}
do_install_once() {
- title "Running $1 install"
+ title "install $1"
timer_reset
- if [[ "$2" == "go" ]]
+
+ if which deactivate >/dev/null; then deactivate; fi
+ if [[ "$1" != "env" ]] && ! . "$VENVDIR/bin/activate"; then
+ result=1
+ elif [[ "$2" == "go" ]]
then
go get -ldflags "-X main.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1"
elif [[ "$2" == "pip" ]]
else
"install_$1"
fi
- result=$?
+ result=${result:-$?}
checkexit $result "$1 install"
- title "End of $1 install (`timer`)"
+ title "install $1 -- `timer`"
return $result
}
&& bundle_install_trylocal \
&& rm -rf .site
}
-do_install doc
install_gem() {
gemname=$1
&& with_test_gemset gem install --no-ri --no-rdoc $(ls -t "$gemname"-*.gem|head -n1)
}
-install_ruby_sdk() {
+install_sdk/ruby() {
install_gem arvados sdk/ruby
}
-do_install sdk/ruby ruby_sdk
-install_R_sdk() {
+install_sdk/R() {
if [[ "$NEED_SDK_R" = true ]]; then
cd "$WORKSPACE/sdk/R" \
&& Rscript --vanilla install_deps.R
fi
}
-do_install sdk/R R_sdk
-install_perl_sdk() {
+install_sdk/perl() {
cd "$WORKSPACE/sdk/perl" \
&& perl Makefile.PL INSTALL_BASE="$PERLINSTALLBASE" \
&& make install INSTALLDIRS=perl
}
-do_install sdk/perl perl_sdk
-install_cli() {
+install_sdk/cli() {
install_gem arvados-cli sdk/cli
}
-do_install sdk/cli cli
-install_login-sync() {
+install_services/login-sync() {
install_gem arvados-login-sync services/login-sync
}
-do_install services/login-sync login-sync
-
-# Install the Python SDK early. Various other test suites (like
-# keepproxy) bring up run_test_server.py, which imports the arvados
-# module. We can't actually *test* the Python SDK yet though, because
-# its own test suite brings up some of those other programs (like
-# keepproxy).
-for p in "${pythonstuff[@]}"
-do
- dir=${p%:py3}
- if [[ ${dir} = ${p} ]]; then
- if [[ -z ${skip[python2]} ]]; then
- do_install ${dir} pip
- fi
- elif [[ -n ${PYTHON3} ]]; then
- if [[ -z ${skip[python3]} ]]; then
- do_install ${dir} pip "$VENV3DIR/bin/"
- fi
- fi
-done
-install_apiserver() {
+install_services/api() {
cd "$WORKSPACE/services/api" \
&& RAILS_ENV=test bundle_install_trylocal
&& RAILS_ENV=test bundle exec rake db:setup \
&& RAILS_ENV=test bundle exec rake db:fixtures:load
}
-do_install services/api apiserver
+
+declare -a pythonstuff
+pythonstuff=(
+ sdk/pam
+ sdk/python
+ sdk/python:py3
+ sdk/cwl
+ sdk/cwl:py3
+ services/dockercleaner:py3
+ services/fuse
+ services/nodemanager
+ tools/crunchstat-summary
+)
declare -a gostuff
gostuff=(
lib/dispatchcloud/scheduler
lib/dispatchcloud/ssh_executor
lib/dispatchcloud/worker
+ lib/service
sdk/go/arvados
sdk/go/arvadosclient
sdk/go/auth
tools/keep-rsync
tools/sync-groups
)
-for g in "${gostuff[@]}"
-do
- do_install "$g" go
-done
-install_workbench() {
+install_apps/workbench() {
cd "$WORKSPACE/apps/workbench" \
&& mkdir -p tmp/cache \
&& RAILS_ENV=test bundle_install_trylocal \
&& RAILS_ENV=test RAILS_GROUPS=assets bundle exec rake npm:install
}
-do_install apps/workbench workbench
-unset http_proxy https_proxy no_proxy
-
-test_doclinkchecker() {
+test_doc() {
(
set -e
cd "$WORKSPACE/doc"
PYTHONPATH=$WORKSPACE/sdk/python/ bundle exec rake linkchecker baseurl=file://$WORKSPACE/doc/.site/ arvados_workbench_host=https://workbench.$ARVADOS_API_HOST arvados_api_host=$ARVADOS_API_HOST
)
}
-do_test doc doclinkchecker
-
-stop_services
-test_apiserver() {
+test_services/api() {
rm -f "$WORKSPACE/services/api/git-commit.version"
cd "$WORKSPACE/services/api" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[services/api]}
}
-do_test services/api apiserver
-
-# Shortcut for when we're only running apiserver tests. This saves a bit of time,
-# because we don't need to start up the api server for subsequent tests.
-if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
- rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
- exit_cleanly
-fi
-
-start_services || { stop_services; fatal "start_services"; }
-test_ruby_sdk() {
+test_sdk/ruby() {
cd "$WORKSPACE/sdk/ruby" \
&& bundle exec rake test TESTOPTS=-v ${testargs[sdk/ruby]}
}
-do_test sdk/ruby ruby_sdk
-test_R_sdk() {
+test_sdk/R() {
if [[ "$NEED_SDK_R" = true ]]; then
cd "$WORKSPACE/sdk/R" \
&& Rscript --vanilla run_test.R
fi
}
-do_test sdk/R R_sdk
-
-test_cli() {
+test_sdk/cli() {
cd "$WORKSPACE/sdk/cli" \
&& mkdir -p /tmp/keep \
&& KEEP_LOCAL_STORE=/tmp/keep bundle exec rake test TESTOPTS=-v ${testargs[sdk/cli]}
}
-do_test sdk/cli cli
-test_java_v2_sdk() {
+test_sdk/java-v2() {
cd "$WORKSPACE/sdk/java-v2" && ./gradlew test
}
-do_test sdk/java-v2 java_v2_sdk
-test_login-sync() {
+test_services/login-sync() {
cd "$WORKSPACE/services/login-sync" \
&& bundle exec rake test TESTOPTS=-v ${testargs[services/login-sync]}
}
-do_test services/login-sync login-sync
-test_nodemanager_integration() {
+test_services/nodemanager_integration() {
cd "$WORKSPACE/services/nodemanager" \
&& tests/integration_test.py ${testargs[services/nodemanager_integration]}
}
-do_test services/nodemanager_integration nodemanager_integration
-for p in "${pythonstuff[@]}"
-do
- dir=${p%:py3}
- if [[ ${dir} = ${p} ]]; then
- if [[ -z ${skip[python2]} ]]; then
- do_test ${dir} pip
- fi
- elif [[ -n ${PYTHON3} ]]; then
- if [[ -z ${skip[python3]} ]]; then
- do_test ${dir} pip "$VENV3DIR/bin/"
- fi
- fi
-done
-
-for g in "${gostuff[@]}"
-do
- do_test "$g" go
-done
-
-test_workbench_units() {
+test_apps/workbench_units() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:units TESTOPTS=-v ${testargs[apps/workbench]}
}
-do_test apps/workbench_units workbench_units
-test_workbench_functionals() {
+test_apps/workbench_functionals() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:functionals TESTOPTS=-v ${testargs[apps/workbench]}
}
-do_test apps/workbench_functionals workbench_functionals
-test_workbench_integration() {
+test_apps/workbench_integration() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:integration TESTOPTS=-v ${testargs[apps/workbench]}
}
-do_test apps/workbench_integration workbench_integration
-
-test_workbench_benchmark() {
+test_apps/workbench_benchmark() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]}
}
-do_test apps/workbench_benchmark workbench_benchmark
-test_workbench_profile() {
+test_apps/workbench_profile() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:profile ${testargs[apps/workbench_profile]}
}
-do_test apps/workbench_profile workbench_profile
+install_deps() {
+ # Install parts needed by test suites
+ do_install env
+ do_install cmd/arvados-server go
+ do_install sdk/cli
+ do_install sdk/perl
+ do_install sdk/python pip
+ do_install sdk/ruby
+ do_install services/api
+ do_install services/arv-git-httpd go
+ do_install services/keepproxy go
+ do_install services/keepstore go
+ do_install services/keep-web go
+ do_install services/ws go
+}
+
+install_all() {
+ do_install env
+ do_install doc
+ do_install sdk/ruby
+ do_install sdk/R
+ do_install sdk/perl
+ do_install sdk/cli
+ do_install services/login-sync
+ for p in "${pythonstuff[@]}"
+ do
+ dir=${p%:py3}
+ if [[ ${dir} = ${p} ]]; then
+ if [[ -z ${skip[python2]} ]]; then
+ do_install ${dir} pip
+ fi
+ elif [[ -n ${PYTHON3} ]]; then
+ if [[ -z ${skip[python3]} ]]; then
+ do_install ${dir} pip "$VENV3DIR/bin/"
+ fi
+ fi
+ done
+ do_install services/api
+ for g in "${gostuff[@]}"
+ do
+ do_install "$g" go
+ done
+ do_install apps/workbench
+}
+
+test_all() {
+ stop_services
+ do_test services/api
+
+ # Shortcut for when we're only running apiserver tests. This saves a bit of time,
+ # because we don't need to start up the api server for subsequent tests.
+ if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
+ rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
+ exit_cleanly
+ fi
+
+ do_test doc
+ do_test sdk/ruby
+ do_test sdk/R
+ do_test sdk/cli
+ do_test services/login-sync
+ do_test sdk/java-v2
+ do_test services/nodemanager_integration
+ for p in "${pythonstuff[@]}"
+ do
+ dir=${p%:py3}
+ if [[ ${dir} = ${p} ]]; then
+ if [[ -z ${skip[python2]} ]]; then
+ do_test ${dir} pip
+ fi
+ elif [[ -n ${PYTHON3} ]]; then
+ if [[ -z ${skip[python3]} ]]; then
+ do_test ${dir} pip "$VENV3DIR/bin/"
+ fi
+ fi
+ done
+
+ for g in "${gostuff[@]}"
+ do
+ do_test "$g" go
+ done
+ do_test apps/workbench_units
+ do_test apps/workbench_functionals
+ do_test apps/workbench_integration
+ do_test apps/workbench_benchmark
+ do_test apps/workbench_profile
+}
+
+help_interactive() {
+ echo "== Interactive commands:"
+ echo "TARGET (short for 'test DIR')"
+ echo "test TARGET"
+ echo "test TARGET:py3 (test with python3)"
+ echo "test TARGET -check.vv (pass arguments to test)"
+ echo "install TARGET"
+ echo "install env (go/python libs)"
+ echo "install deps (go/python libs + arvados components needed for integration tests)"
+ echo "reset (...services used by integration tests)"
+ echo "exit"
+ echo "== Test targets:"
+ echo "${!testfuncargs[@]}" | tr ' ' '\n' | sort | column
+}
+
+initialize
+
+declare -A testfuncargs=()
+for g in "${gostuff[@]}"; do
+ testfuncargs[$g]="$g go"
+done
+for p in "${pythonstuff[@]}"; do
+ dir=${p%:py3}
+ if [[ ${dir} = ${p} ]]; then
+ testfuncargs[$p]="$dir pip $VENVDIR/bin/"
+ else
+ testfuncargs[$p]="$dir pip $VENV3DIR/bin/"
+ fi
+done
+
+if [[ -z ${interactive} ]]; then
+ install_all
+ test_all
+else
+ skip=()
+ only=()
+ only_install=()
+ if [[ -e "$VENVDIR/bin/activate" ]]; then stop_services; fi
+ setnextcmd() {
+ if [[ "$nextcmd" != "install deps" ]]; then
+ :
+ elif [[ -e "$VENVDIR/bin/activate" ]]; then
+ nextcmd="test lib/cmd"
+ else
+ nextcmd="install deps"
+ fi
+ }
+ echo
+ help_interactive
+ nextcmd="install deps"
+ setnextcmd
+ while read -p 'What next? ' -e -i "${nextcmd}" nextcmd; do
+ read verb target opts <<<"${nextcmd}"
+ case "${verb}" in
+ "" | "help")
+ help_interactive
+ ;;
+ "exit" | "quit")
+ exit_cleanly
+ ;;
+ "reset")
+ stop_services
+ ;;
+ *)
+ target="${target%/}"
+ testargs["$target"]="${opts}"
+ case "$target" in
+ all | deps)
+ ${verb}_${target}
+ ;;
+ *)
+ tt="${testfuncargs[${target}]}"
+ tt="${tt:-$target}"
+ do_$verb $tt
+ ;;
+ esac
+ ;;
+ esac
+ if [[ ${#successes[@]} -gt 0 || ${#failures[@]} -gt 0 ]]; then
+ report_outcomes
+ successes=()
+ failures=()
+ fi
+ cd "$WORKSPACE"
+ setnextcmd
+ done
+ echo
+fi
exit_cleanly
--- /dev/null
+#!/usr/bin/env cwl-runner
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: CC-BY-SA-3.0
+
+# Example input file providing both content addresses and UUIDs. The
+# collections identified by 'collectionUUID' will be checked that the
+# current content of the collection record matches the content address
+# in the 'location' field.
+
+$namespaces:
+ arv: 'http://arvados.org/cwl#'
+
+cwl:tool: bwa-mem.cwl
+reference:
+ class: File
+ location: keep:2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt
+ arv:collectionUUID: qr1hi-4zz18-pwid4w22a40jp8l
+read_p1:
+ class: File
+ location: keep:ae480c5099b81e17267b7445e35b4bc7+180/HWI-ST1027_129_D0THKACXX.1_1.fastq
+ arv:collectionUUID: qr1hi-4zz18-h615rgfmqt3wje0
+read_p2:
+ class: File
+ location: keep:ae480c5099b81e17267b7445e35b4bc7+180/HWI-ST1027_129_D0THKACXX.1_2.fastq
+ arv:collectionUUID: qr1hi-4zz18-h615rgfmqt3wje0
+group_id: arvados_tutorial
+sample_id: HWI-ST1027_129
+PL: illumina
--- /dev/null
+#!/usr/bin/env cwl-runner
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: CC-BY-SA-3.0
+
+# Example input file using UUIDs to reference input collections. These
+# will be resolved to content addresses before running the workflow.
+
+cwl:tool: bwa-mem.cwl
+reference:
+ class: File
+ location: keep:qr1hi-4zz18-pwid4w22a40jp8l/19.fasta.bwt
+read_p1:
+ class: File
+ location: keep:qr1hi-4zz18-h615rgfmqt3wje0/HWI-ST1027_129_D0THKACXX.1_1.fastq
+read_p2:
+ class: File
+ location: keep:qr1hi-4zz18-h615rgfmqt3wje0/HWI-ST1027_129_D0THKACXX.1_2.fastq
+group_id: arvados_tutorial
+sample_id: HWI-ST1027_129
+PL: illumina
2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
{
"aligned_sam": {
- "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
+ "location": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
"checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
"class": "File",
"size": 30738986
When running a workflow on an Arvados cluster, the input files must be stored in Keep. There are several ways this can happen.
-A URI reference to Keep uses the @keep:@ scheme followed by the portable data hash, collection size, and path to the file inside the collection. For example, @keep:2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt@.
+A URI reference to Keep uses the @keep:@ scheme followed by either the portable data hash or UUID of the collection and then the location of the file inside the collection. For example, @keep:2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt@ or @keep:zzzzz-4zz18-zzzzzzzzzzzzzzz/19.fasta.bwt@.
If you reference a file in "arv-mount":{{site.baseurl}}/user/tutorials/tutorial-keep-mount.html, such as @/home/example/keep/by_id/2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt@, then @arvados-cwl-runner@ will automatically determine the appropriate Keep URI reference.
2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
{
"aligned_sam": {
- "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
+ "location": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
"checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
"class": "File",
"size": 30738986
az.stopWg.Add(1)
defer az.stopWg.Done()
+ if instanceType.AddedScratch > 0 {
+ return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
+ }
+
name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
if err != nil {
return nil, err
func (ai *azureInstance) Address() string {
if ai.nic.IPConfigurations != nil &&
len(*ai.nic.IPConfigurations) > 0 &&
- (*ai.nic.IPConfigurations)[0].PrivateIPAddress != nil {
+ (*ai.nic.IPConfigurations)[0].InterfaceIPConfigurationPropertiesFormat != nil &&
+ (*ai.nic.IPConfigurations)[0].InterfaceIPConfigurationPropertiesFormat.PrivateIPAddress != nil {
return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
}
var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
-func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile, _ string) service.Handler {
return &Handler{Cluster: cluster, NodeProfile: np}
}
},
}
node := s.cluster.NodeProfiles["*"]
- s.handler = newHandler(s.ctx, s.cluster, &node)
+ s.handler = newHandler(s.ctx, s.cluster, &node, "")
}
func (s *HandlerSuite) TearDownTest(c *check.C) {
"Keep-Alive": true,
"Proxy-Authenticate": true,
"Proxy-Authorization": true,
+ // this line makes gofmt 1.10 and 1.11 agree
"TE": true,
"Trailer": true,
"Transfer-Encoding": true, // *-Encoding headers interfer with Go's automatic compression/decompression
import (
"context"
+ "fmt"
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/service"
var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
-func newHandler(ctx context.Context, cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
- d := &dispatcher{Cluster: cluster, Context: ctx}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile, token string) service.Handler {
+ ac, err := arvados.NewClientFromConfig(cluster)
+ if err != nil {
+ return service.ErrorHandler(ctx, cluster, np, fmt.Errorf("error initializing client from cluster config: %s", err))
+ }
+ d := &dispatcher{
+ Cluster: cluster,
+ Context: ctx,
+ ArvClient: ac,
+ AuthToken: token,
+ }
go d.Start()
return d
}
package container
import (
+ "errors"
"io"
"sync"
"time"
// cache up to date.
type Queue struct {
logger logrus.FieldLogger
- reg *prometheus.Registry
chooseType typeChooser
client APIClient
// Arvados cluster's queue during Update, chooseType will be called to
// assign an appropriate arvados.InstanceType for the queue entry.
func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
- return &Queue{
+ cq := &Queue{
logger: logger,
- reg: reg,
chooseType: chooseType,
client: client,
current: map[string]QueueEnt{},
subscribers: map[<-chan struct{}]chan struct{}{},
}
+ if reg != nil {
+ go cq.runMetrics(reg)
+ }
+ return cq
}
// Subscribe returns a channel that becomes ready to receive when an
}
apply(avail)
- var missing []string
+ missing := map[string]bool{}
cq.mtx.Lock()
for uuid, ent := range cq.current {
if next[uuid] == nil &&
ent.Container.State != arvados.ContainerStateCancelled &&
ent.Container.State != arvados.ContainerStateComplete {
- missing = append(missing, uuid)
+ missing[uuid] = true
}
}
cq.mtx.Unlock()
- for i, page := 0, 20; i < len(missing); i += page {
- batch := missing[i:]
- if len(batch) > page {
- batch = batch[:page]
+ for len(missing) > 0 {
+ var batch []string
+ for uuid := range missing {
+ batch = append(batch, uuid)
+ if len(batch) == 20 {
+ break
+ }
}
+ filters := []arvados.Filter{{"uuid", "in", batch}}
ended, err := cq.fetchAll(arvados.ResourceListParams{
Select: selectParam,
Order: "uuid",
Count: "none",
- Filters: []arvados.Filter{{"uuid", "in", batch}},
+ Filters: filters,
})
if err != nil {
return nil, err
}
apply(ended)
+ if len(ended) == 0 {
+ // This is the only case where we can conclude
+ // a container has been deleted from the
+ // database. A short (but non-zero) page, on
+ // the other hand, can be caused by a response
+ // size limit.
+ for _, uuid := range batch {
+ cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
+ delete(missing, uuid)
+ cq.mtx.Lock()
+ cq.delEnt(uuid, cq.current[uuid].Container.State)
+ cq.mtx.Unlock()
+ }
+ continue
+ }
+ for _, ctr := range ended {
+ if _, ok := missing[ctr.UUID]; !ok {
+ msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
+ cq.logger.WithFields(logrus.Fields{
+ "ContainerUUID": ctr.UUID,
+ "Filters": filters,
+ }).Error(msg)
+ return nil, errors.New(msg)
+ }
+ delete(missing, ctr.UUID)
+ }
}
return next, nil
}
}
return results, nil
}
+
+func (cq *Queue) runMetrics(reg *prometheus.Registry) {
+ mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "queue_entries",
+ Help: "Number of active container entries in the controller database.",
+ }, []string{"state", "instance_type"})
+ reg.MustRegister(mEntries)
+
+ type entKey struct {
+ state arvados.ContainerState
+ inst string
+ }
+ count := map[entKey]int{}
+
+ ch := cq.Subscribe()
+ defer cq.Unsubscribe(ch)
+ for range ch {
+ for k := range count {
+ count[k] = 0
+ }
+ ents, _ := cq.Entries()
+ for _, ent := range ents {
+ count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+ }
+ for k, v := range count {
+ mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
+ }
+ }
+}
scheduler.WorkerPool
Instances() []worker.InstanceView
SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
+ KillInstance(id cloud.InstanceID, reason string) error
Stop()
}
type dispatcher struct {
Cluster *arvados.Cluster
Context context.Context
+ ArvClient *arvados.Client
+ AuthToken string
InstanceSetID cloud.InstanceSetID
logger logrus.FieldLogger
}
func (disp *dispatcher) initialize() {
- arvClient := arvados.NewClientFromEnv()
+ disp.logger = ctxlog.FromContext(disp.Context)
+
+ disp.ArvClient.AuthToken = disp.AuthToken
+
if disp.InstanceSetID == "" {
- if strings.HasPrefix(arvClient.AuthToken, "v2/") {
- disp.InstanceSetID = cloud.InstanceSetID(strings.Split(arvClient.AuthToken, "/")[1])
+ if strings.HasPrefix(disp.AuthToken, "v2/") {
+ disp.InstanceSetID = cloud.InstanceSetID(strings.Split(disp.AuthToken, "/")[1])
} else {
// Use some other string unique to this token
// that doesn't reveal the token itself.
- disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(arvClient.AuthToken))))
+ disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(disp.AuthToken))))
}
}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
- disp.logger = ctxlog.FromContext(disp.Context)
if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
}
disp.instanceSet = instanceSet
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
- disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
+ disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
+ disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, disp.ArvClient)
if disp.Cluster.ManagementToken == "" {
disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
}
+// Management API: shutdown/destroy specified instance now.
+func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
+ id := cloud.InstanceID(r.FormValue("instance_id"))
+ if id == "" {
+ httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+ return
+ }
+ err := disp.pool.KillInstance(id, "via management API: "+r.FormValue("reason"))
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusNotFound)
+ return
+ }
+}
+
func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
id := cloud.InstanceID(r.FormValue("instance_id"))
if id == "" {
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/ctxlog"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
s.cluster = &arvados.Cluster{
CloudVMs: arvados.CloudVMs{
- Driver: "test",
- SyncInterval: arvados.Duration(10 * time.Millisecond),
- TimeoutIdle: arvados.Duration(150 * time.Millisecond),
- TimeoutBooting: arvados.Duration(150 * time.Millisecond),
- TimeoutProbe: arvados.Duration(15 * time.Millisecond),
- TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ Driver: "test",
+ SyncInterval: arvados.Duration(10 * time.Millisecond),
+ TimeoutIdle: arvados.Duration(150 * time.Millisecond),
+ TimeoutBooting: arvados.Duration(150 * time.Millisecond),
+ TimeoutProbe: arvados.Duration(15 * time.Millisecond),
+ TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ MaxCloudOpsPerSecond: 500,
},
Dispatch: arvados.Dispatch{
PrivateKey: string(dispatchprivraw),
ProbeInterval: arvados.Duration(5 * time.Millisecond),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
MaxProbesPerSecond: 1000,
+ TimeoutSignal: arvados.Duration(3 * time.Millisecond),
+ TimeoutTERM: arvados.Duration(20 * time.Millisecond),
},
InstanceTypes: arvados.InstanceTypeMap{
test.InstanceType(1).Name: test.InstanceType(1),
DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
},
},
+ Services: arvados.Services{
+ Controller: arvados.Service{ExternalURL: arvados.URL{Scheme: "https", Host: os.Getenv("ARVADOS_API_HOST")}},
+ },
}
+
+ arvClient, err := arvados.NewClientFromConfig(s.cluster)
+ c.Check(err, check.IsNil)
+
s.disp = &dispatcher{
- Cluster: s.cluster,
- Context: s.ctx,
+ Cluster: s.cluster,
+ Context: s.ctx,
+ ArvClient: arvClient,
+ AuthToken: arvadostest.AdminToken,
}
// Test cases can modify s.cluster before calling
// initialize(), and then modify private state before calling
for _, ctr := range queue.Containers {
waiting[ctr.UUID] = struct{}{}
}
- executeContainer := func(ctr arvados.Container) int {
+ finishContainer := func(ctr arvados.Container) {
mtx.Lock()
defer mtx.Unlock()
if _, ok := waiting[ctr.UUID]; !ok {
- c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
- return 1
+ c.Errorf("container completed twice: %s", ctr.UUID)
+ return
}
delete(waiting, ctr.UUID)
if len(waiting) == 0 {
close(done)
}
+ }
+ executeContainer := func(ctr arvados.Container) int {
+ finishContainer(ctr)
return int(rand.Uint32() & 0x3)
}
n := 0
stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
stubvm.ExecuteContainer = executeContainer
+ stubvm.CrashRunningContainer = finishContainer
switch n % 7 {
case 0:
stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
case 1:
stubvm.CrunchRunMissing = true
+ case 2:
+ stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
default:
stubvm.CrunchRunCrashRate = 0.1
}
c.Check(ok, check.Equals, true)
<-ch
- sr = getInstances()
+ for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); {
+ sr = getInstances()
+ if len(sr.Items) > 0 {
+ break
+ }
+ time.Sleep(time.Millisecond)
+ }
c.Assert(len(sr.Items), check.Equals, 1)
c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
import (
"fmt"
+ "time"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/cloud/azure"
"git.curoverse.com/arvados.git/lib/cloud/ec2"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
var drivers = map[string]cloud.Driver{
if !ok {
return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
}
- return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+ is, err := driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+ if maxops := cluster.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
+ is = &rateLimitedInstanceSet{
+ InstanceSet: is,
+ ticker: time.NewTicker(time.Second / time.Duration(maxops)),
+ }
+ }
+ return is, err
+}
+
+type rateLimitedInstanceSet struct {
+ cloud.InstanceSet
+ ticker *time.Ticker
+}
+
+func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+ <-is.ticker.C
+ inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
+ return &rateLimitedInstance{inst, is.ticker}, err
+}
+
+type rateLimitedInstance struct {
+ cloud.Instance
+ ticker *time.Ticker
+}
+
+func (inst *rateLimitedInstance) Destroy() error {
+ <-inst.ticker.C
+ return inst.Instance.Destroy()
}
// Give up.
break waiting
}
-
}
for _, uuid := range stale {
Create(arvados.InstanceType) bool
Shutdown(arvados.InstanceType) bool
StartContainer(arvados.InstanceType, arvados.Container) bool
- KillContainer(uuid string)
+ KillContainer(uuid, reason string)
Subscribe() <-chan struct{}
Unsubscribe(<-chan struct{})
}
p.unalloc[it]++
return true
}
-func (p *stubPool) KillContainer(uuid string) {
+func (p *stubPool) KillContainer(uuid, reason string) {
p.Lock()
defer p.Unlock()
delete(p.running, uuid)
}
c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
}
+
+func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+ pool := stubPool{
+ unalloc: map[arvados.InstanceType]int{
+ test.InstanceType(2): 0,
+ },
+ idle: map[arvados.InstanceType]int{
+ test.InstanceType(2): 0,
+ },
+ running: map[string]time.Time{
+ test.ContainerUUID(2): time.Time{},
+ },
+ }
+ queue := test.Queue{
+ ChooseType: chooseType,
+ Containers: []arvados.Container{
+ {
+ // create a new worker
+ UUID: test.ContainerUUID(1),
+ Priority: 1,
+ State: arvados.ContainerStateLocked,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
+ },
+ },
+ }
+ queue.Update()
+ sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+ c.Check(pool.running, check.HasLen, 1)
+ sch.sync()
+ for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
+ }
+ c.Check(pool.Running(), check.HasLen, 0)
+}
switch ent.Container.State {
case arvados.ContainerStateRunning:
if !running {
- go sch.cancel(ent, "not running on any worker")
+ go sch.cancel(uuid, "not running on any worker")
} else if !exited.IsZero() && qUpdated.After(exited) {
- go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
+ go sch.cancel(uuid, "state=Running after crunch-run exited")
} else if ent.Container.Priority == 0 {
- go sch.kill(ent, "priority=0")
+ go sch.kill(uuid, "priority=0")
}
case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
if running {
// of kill() will be to make the
// worker available for the next
// container.
- go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
} else {
sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
// a network outage and is still
// preparing to run a container that
// has already been unlocked/requeued.
- go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
}
case arvados.ContainerStateLocked:
if running && !exited.IsZero() && qUpdated.After(exited) {
go sch.requeue(ent, "crunch-run exited")
} else if running && exited.IsZero() && ent.Container.Priority == 0 {
- go sch.kill(ent, "priority=0")
+ go sch.kill(uuid, "priority=0")
} else if !running && ent.Container.Priority == 0 {
go sch.requeue(ent, "priority=0")
}
}).Error("BUG: unexpected state")
}
}
+ for uuid := range running {
+ if _, known := qEntries[uuid]; !known {
+ go sch.kill(uuid, "not in queue")
+ }
+ }
}
-func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
- uuid := ent.Container.UUID
+func (sch *Scheduler) cancel(uuid string, reason string) {
if !sch.uuidLock(uuid, "cancel") {
return
}
}
}
-func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
- uuid := ent.Container.UUID
- logger := sch.logger.WithField("ContainerUUID", uuid)
- logger.Debugf("killing crunch-run process because %s", reason)
- sch.pool.KillContainer(uuid)
+func (sch *Scheduler) kill(uuid string, reason string) {
+ sch.pool.KillContainer(uuid, reason)
}
func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
//
// The resulting changes are not exposed through Get() or Entries()
// until the next call to Update().
-func (q *Queue) Notify(upd arvados.Container) {
+//
+// Return value is true unless the update is rejected (invalid state
+// transition).
+func (q *Queue) Notify(upd arvados.Container) bool {
q.mtx.Lock()
defer q.mtx.Unlock()
for i, ctr := range q.Containers {
if ctr.UUID == upd.UUID {
- q.Containers[i] = upd
- return
+ if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+ q.Containers[i] = upd
+ return true
+ }
+ return false
}
}
q.Containers = append(q.Containers, upd)
+ return true
}
tags: copyTags(tags),
providerType: it.ProviderType,
initCommand: cmd,
+ running: map[string]int64{},
+ killing: map[string]bool{},
}
svm.SSHService = SSHService{
HostKey: sis.driver.HostKey,
// running (and might change IP addresses, shut down, etc.) without
// updating any stubInstances that have been returned to callers.
type StubVM struct {
- Boot time.Time
- Broken time.Time
- CrunchRunMissing bool
- CrunchRunCrashRate float64
- CrunchRunDetachDelay time.Duration
- ExecuteContainer func(arvados.Container) int
+ Boot time.Time
+ Broken time.Time
+ ReportBroken time.Time
+ CrunchRunMissing bool
+ CrunchRunCrashRate float64
+ CrunchRunDetachDelay time.Duration
+ ExecuteContainer func(arvados.Container) int
+ CrashRunningContainer func(arvados.Container)
sis *StubInstanceSet
id cloud.InstanceID
initCommand cloud.InitCommand
providerType string
SSHService SSHService
- running map[string]bool
+ running map[string]int64
+ killing map[string]bool
+ lastPID int64
sync.Mutex
}
}
for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
if stdinKV[name] == "" {
- fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
+ fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
return 1
}
}
svm.Lock()
- if svm.running == nil {
- svm.running = map[string]bool{}
- }
- svm.running[uuid] = true
+ svm.lastPID++
+ pid := svm.lastPID
+ svm.running[uuid] = pid
svm.Unlock()
time.Sleep(svm.CrunchRunDetachDelay)
fmt.Fprintf(stderr, "starting %s\n", uuid)
logger := svm.sis.logger.WithFields(logrus.Fields{
"Instance": svm.id,
"ContainerUUID": uuid,
+ "PID": pid,
})
logger.Printf("[test] starting crunch-run stub")
go func() {
logger.Print("[test] container not in queue")
return
}
+
+ defer func() {
+ if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil {
+ svm.CrashRunningContainer(ctr)
+ }
+ }()
+
if crashluck > svm.CrunchRunCrashRate/2 {
time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
ctr.State = arvados.ContainerStateRunning
- queue.Notify(ctr)
+ if !queue.Notify(ctr) {
+ ctr, _ = queue.Get(uuid)
+ logger.Print("[test] erroring out because state=Running update was rejected")
+ return
+ }
}
time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+
svm.Lock()
- _, running := svm.running[uuid]
- svm.Unlock()
- if !running {
+ defer svm.Unlock()
+ if svm.running[uuid] != pid {
logger.Print("[test] container was killed")
return
}
- if svm.ExecuteContainer != nil {
- ctr.ExitCode = svm.ExecuteContainer(ctr)
- }
- // TODO: Check whether the stub instance has
- // been destroyed, and if so, don't call
- // queue.Notify. Then "container finished
- // twice" can be classified as a bug.
+ delete(svm.running, uuid)
+
if crashluck < svm.CrunchRunCrashRate {
- logger.Print("[test] crashing crunch-run stub")
+ logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
} else {
+ if svm.ExecuteContainer != nil {
+ ctr.ExitCode = svm.ExecuteContainer(ctr)
+ }
+ logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub")
ctr.State = arvados.ContainerStateComplete
- queue.Notify(ctr)
+ go queue.Notify(ctr)
}
- logger.Print("[test] exiting crunch-run stub")
- svm.Lock()
- defer svm.Unlock()
- delete(svm.running, uuid)
}()
return 0
}
for uuid := range svm.running {
fmt.Fprintf(stdout, "%s\n", uuid)
}
+ if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
+ fmt.Fprintln(stdout, "broken")
+ }
return 0
}
if strings.HasPrefix(command, "crunch-run --kill ") {
svm.Lock()
- defer svm.Unlock()
- if svm.running[uuid] {
- delete(svm.running, uuid)
+ pid, running := svm.running[uuid]
+ if running && !svm.killing[uuid] {
+ svm.killing[uuid] = true
+ go func() {
+ time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond)
+ svm.Lock()
+ defer svm.Unlock()
+ if svm.running[uuid] == pid {
+ // Kill only if the running entry
+ // hasn't since been killed and
+ // replaced with a different one.
+ delete(svm.running, uuid)
+ }
+ delete(svm.killing, uuid)
+ }()
+ svm.Unlock()
+ time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
+ svm.Lock()
+ _, running = svm.running[uuid]
+ }
+ svm.Unlock()
+ if running {
+ fmt.Fprintf(stderr, "%s: container is running\n", uuid)
+ return 1
} else {
fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+ return 0
}
- return 0
}
if command == "true" {
return 0
defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
defaultTimeoutShutdown = time.Second * 10
+ defaultTimeoutTERM = time.Minute * 2
+ defaultTimeoutSignal = time.Second * 5
// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
installPublicKey: installPublicKey,
stop: make(chan bool),
}
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutShutdown time.Duration
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
installPublicKey ssh.PublicKey
// private state
if !ok {
return errors.New("requested instance does not exist")
}
- wkr.idleBehavior = idleBehavior
- wkr.saveTags()
- wkr.shutdownIfIdle()
+ wkr.setIdleBehavior(idleBehavior)
return nil
}
probed: now,
busy: now,
updated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
+ running: make(map[string]*remoteRunner),
+ starting: make(map[string]*remoteRunner),
probing: make(chan struct{}, 1),
}
wp.workers[id] = wkr
return wkr, true
}
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
- wp.exited[uuid] = t
-}
-
// Shutdown shuts down a worker with the given type, or returns false
// if all workers with the given type are busy.
func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
}
// CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
func (wp *Pool) CountWorkers() map[State]int {
wp.setupOnce.Do(wp.setup)
+ wp.waitUntilLoaded()
wp.mtx.Lock()
defer wp.mtx.Unlock()
r := map[State]int{}
//
// KillContainer returns immediately; the act of killing the container
// takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
+ logger := wp.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "Reason": reason,
+ })
if _, ok := wp.exited[uuid]; ok {
- wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+ logger.Debug("clearing placeholder for exited crunch-run process")
delete(wp.exited, uuid)
return
}
for _, wkr := range wp.workers {
- if _, ok := wkr.running[uuid]; ok {
- go wp.kill(wkr, uuid)
- return
+ rr := wkr.running[uuid]
+ if rr == nil {
+ rr = wkr.starting[uuid]
}
- }
- wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
- logger := wp.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "Instance": wkr.instance.ID(),
- })
- logger.Debug("killing process")
- cmd := "crunch-run --kill 15 " + uuid
- if u := wkr.instance.RemoteUser(); u != "root" {
- cmd = "sudo " + cmd
- }
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
- if err != nil {
- logger.WithFields(logrus.Fields{
- "stderr": string(stderr),
- "stdout": string(stdout),
- "error": err,
- }).Warn("kill failed")
- return
- }
- logger.Debug("killing process succeeded")
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- if _, ok := wkr.running[uuid]; ok {
- delete(wkr.running, uuid)
- if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
- wkr.state = StateIdle
+ if rr != nil {
+ rr.Kill(reason)
+ return
}
- wkr.updated = time.Now()
- go wp.notify()
}
+ logger.Debug("cannot kill: already disappeared")
}
func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
return r
}
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("instance not found")
+ }
+ wkr.logger.WithField("Reason", reason).Info("shutting down")
+ wkr.shutdown()
+ return nil
+}
+
func (wp *Pool) setup() {
wp.creating = map[string]createCall{}
wp.exited = map[string]time.Time{}
})
logger.Info("instance disappeared in cloud")
delete(wp.workers, id)
- go wkr.executor.Close()
+ go wkr.Close()
notify = true
}
if !wp.loaded {
+ notify = true
wp.loaded = true
wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
}
}
}
+func (wp *Pool) waitUntilLoaded() {
+ ch := wp.Subscribe()
+ wp.mtx.RLock()
+ defer wp.mtx.RUnlock()
+ for !wp.loaded {
+ wp.mtx.RUnlock()
+ <-ch
+ wp.mtx.RLock()
+ }
+}
+
// Return a random string of n hexadecimal digits (n*4 random bits). n
// must be even.
func randomHex(n int) string {
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "syscall"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
+)
+
+// remoteRunner handles the starting and stopping of a crunch-run
+// process on a remote machine.
+type remoteRunner struct {
+ uuid string
+ executor Executor
+ arvClient *arvados.Client
+ remoteUser string
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
+ onUnkillable func(uuid string) // callback invoked when giving up on SIGTERM
+ onKilled func(uuid string) // callback invoked when process exits after SIGTERM
+ logger logrus.FieldLogger
+
+ stopping bool // true if Stop() has been called
+ givenup bool // true if timeoutTERM has been reached
+ closed chan struct{} // channel is closed if Close() has been called
+}
+
+// newRemoteRunner returns a new remoteRunner. Caller should ensure
+// Close() is called to release resources.
+func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+ rr := &remoteRunner{
+ uuid: uuid,
+ executor: wkr.executor,
+ arvClient: wkr.wp.arvClient,
+ remoteUser: wkr.instance.RemoteUser(),
+ timeoutTERM: wkr.wp.timeoutTERM,
+ timeoutSignal: wkr.wp.timeoutSignal,
+ onUnkillable: wkr.onUnkillable,
+ onKilled: wkr.onKilled,
+ logger: wkr.logger.WithField("ContainerUUID", uuid),
+ closed: make(chan struct{}),
+ }
+ return rr
+}
+
+// Start a crunch-run process on the remote host.
+//
+// Start does not return any error encountered. The caller should
+// assume the remote process _might_ have started, at least until it
+// probes the worker and finds otherwise.
+func (rr *remoteRunner) Start() {
+ env := map[string]string{
+ "ARVADOS_API_HOST": rr.arvClient.APIHost,
+ "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
+ }
+ if rr.arvClient.Insecure {
+ env["ARVADOS_API_HOST_INSECURE"] = "1"
+ }
+ envJSON, err := json.Marshal(env)
+ if err != nil {
+ panic(err)
+ }
+ stdin := bytes.NewBuffer(envJSON)
+ cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+ if rr.remoteUser != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
+ if err != nil {
+ rr.logger.WithField("stdout", string(stdout)).
+ WithField("stderr", string(stderr)).
+ WithError(err).
+ Error("error starting crunch-run process")
+ return
+ }
+ rr.logger.Info("crunch-run process started")
+}
+
+// Close abandons the remote process (if any) and releases
+// resources. Close must not be called more than once.
+func (rr *remoteRunner) Close() {
+ close(rr.closed)
+}
+
+// Kill starts a background task to kill the remote process, first
+// trying SIGTERM until reaching timeoutTERM, then calling
+// onUnkillable().
+//
+// SIGKILL is not used. It would merely kill the crunch-run supervisor
+// and thereby make the docker container, arv-mount, etc. invisible to
+// us without actually stopping them.
+//
+// Once Kill has been called, calling it again has no effect.
+func (rr *remoteRunner) Kill(reason string) {
+ if rr.stopping {
+ return
+ }
+ rr.stopping = true
+ rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
+ go func() {
+ termDeadline := time.Now().Add(rr.timeoutTERM)
+ t := time.NewTicker(rr.timeoutSignal)
+ defer t.Stop()
+ for range t.C {
+ switch {
+ case rr.isClosed():
+ return
+ case time.Now().After(termDeadline):
+ rr.logger.Debug("giving up")
+ rr.givenup = true
+ rr.onUnkillable(rr.uuid)
+ return
+ default:
+ rr.kill(syscall.SIGTERM)
+ }
+ }
+ }()
+}
+
+func (rr *remoteRunner) kill(sig syscall.Signal) {
+ logger := rr.logger.WithField("Signal", int(sig))
+ logger.Info("sending signal")
+ cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+ if rr.remoteUser != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
+ if err != nil {
+ logger.WithFields(logrus.Fields{
+ "stderr": string(stderr),
+ "stdout": string(stdout),
+ "error": err,
+ }).Info("kill attempt unsuccessful")
+ return
+ }
+ rr.onKilled(rr.uuid)
+}
+
+func (rr *remoteRunner) isClosed() bool {
+ select {
+ case <-rr.closed:
+ return true
+ default:
+ return false
+ }
+}
package worker
import (
- "bytes"
- "encoding/json"
"fmt"
"strings"
"sync"
busy time.Time
destroyed time.Time
lastUUID string
- running map[string]struct{} // remember to update state idle<->running when this changes
- starting map[string]struct{} // remember to update state idle<->running when this changes
+ running map[string]*remoteRunner // remember to update state idle<->running when this changes
+ starting map[string]*remoteRunner // remember to update state idle<->running when this changes
probing chan struct{}
}
+func (wkr *worker) onUnkillable(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ logger := wkr.logger.WithField("ContainerUUID", uuid)
+ if wkr.idleBehavior == IdleBehaviorHold {
+ logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+ return
+ }
+ logger.Warn("unkillable container, draining worker")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ wkr.closeRunner(uuid)
+ go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+ wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+ wkr.idleBehavior = idleBehavior
+ wkr.saveTags()
+ wkr.shutdownIfIdle()
+}
+
// caller must have lock.
func (wkr *worker) startContainer(ctr arvados.Container) {
logger := wkr.logger.WithFields(logrus.Fields{
"ContainerUUID": ctr.UUID,
"Priority": ctr.Priority,
})
- logger = logger.WithField("Instance", wkr.instance.ID())
logger.Debug("starting container")
- wkr.starting[ctr.UUID] = struct{}{}
+ rr := newRemoteRunner(ctr.UUID, wkr)
+ wkr.starting[ctr.UUID] = rr
if wkr.state != StateRunning {
wkr.state = StateRunning
go wkr.wp.notify()
}
go func() {
- env := map[string]string{
- "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
- "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
- }
- if wkr.wp.arvClient.Insecure {
- env["ARVADOS_API_HOST_INSECURE"] = "1"
- }
- envJSON, err := json.Marshal(env)
- if err != nil {
- panic(err)
- }
- stdin := bytes.NewBuffer(envJSON)
- cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
- if u := wkr.instance.RemoteUser(); u != "root" {
- cmd = "sudo " + cmd
- }
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
+ rr.Start()
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
wkr.updated = now
wkr.busy = now
delete(wkr.starting, ctr.UUID)
- wkr.running[ctr.UUID] = struct{}{}
- wkr.lastUUID = ctr.UUID
- if err != nil {
- logger.WithField("stdout", string(stdout)).
- WithField("stderr", string(stderr)).
- WithError(err).
- Error("error starting crunch-run process")
- // Leave uuid in wkr.running, though: it's
- // possible the error was just a communication
- // failure and the process was in fact
- // started. Wait for next probe to find out.
- return
- }
- logger.Info("crunch-run process started")
+ wkr.running[ctr.UUID] = rr
wkr.lastUUID = ctr.UUID
}()
}
logger.Info("instance booted; will try probeRunning")
}
}
+ reportedBroken := false
if booted || wkr.state == StateUnknown {
- ctrUUIDs, ok = wkr.probeRunning()
+ ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
}
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
+ if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+ logger.Info("probe reported broken instance")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+ }
if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
if wkr.state == StateShutdown && wkr.updated.After(updated) {
// Skip the logging noise if shutdown was
// advantage of the non-busy state, though.
wkr.busy = updateTime
}
- changed := false
- // Build a new "running" map. Set changed=true if it differs
- // from the existing map (wkr.running) to ensure the scheduler
- // gets notified below.
- running := map[string]struct{}{}
- for _, uuid := range ctrUUIDs {
- running[uuid] = struct{}{}
- if _, ok := wkr.running[uuid]; !ok {
- if _, ok := wkr.starting[uuid]; !ok {
- // We didn't start it -- it must have
- // been started by a previous
- // dispatcher process.
- logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
- }
- changed = true
- }
- }
- for uuid := range wkr.running {
- if _, ok := running[uuid]; !ok {
- logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
- wkr.wp.notifyExited(uuid, updateTime)
- changed = true
- }
- }
+ changed := wkr.updateRunning(ctrUUIDs)
// Update state if this was the first successful boot-probe.
if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
// Log whenever a run-probe reveals crunch-run processes
// appearing/disappearing before boot-probe succeeds.
- if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+ if wkr.state == StateUnknown && changed {
logger.WithFields(logrus.Fields{
- "RunningContainers": len(running),
+ "RunningContainers": len(wkr.running),
"State": wkr.state,
}).Info("crunch-run probe succeeded, but boot probe is still failing")
}
- wkr.running = running
if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
wkr.state = StateRunning
} else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
wkr.updated = updateTime
if booted && (initialState == StateUnknown || initialState == StateBooting) {
logger.WithFields(logrus.Fields{
- "RunningContainers": len(running),
+ "RunningContainers": len(wkr.running),
"State": wkr.state,
}).Info("probes succeeded, instance is in service")
}
go wkr.wp.notify()
}
-func (wkr *worker) probeRunning() (running []string, ok bool) {
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
cmd := "crunch-run --list"
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
- return nil, false
+ return
}
- stdout = bytes.TrimRight(stdout, "\n")
- if len(stdout) == 0 {
- return nil, true
+ ok = true
+ for _, s := range strings.Split(string(stdout), "\n") {
+ if s == "broken" {
+ reportsBroken = true
+ } else if s != "" {
+ running = append(running, s)
+ }
}
- return strings.Split(string(stdout), "\n"), true
+ return
}
func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
return true
}
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
// caller must have lock.
-func (wkr *worker) shutdownIfIdle() bool {
+func (wkr *worker) eligibleForShutdown() bool {
if wkr.idleBehavior == IdleBehaviorHold {
- // Never shut down.
return false
}
- age := time.Since(wkr.busy)
-
- old := age >= wkr.wp.timeoutIdle
draining := wkr.idleBehavior == IdleBehaviorDrain
- shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
- (draining && wkr.state == StateBooting)
- if !shouldShutdown {
+ switch wkr.state {
+ case StateBooting:
+ return draining
+ case StateIdle:
+ return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+ case StateRunning:
+ if !draining {
+ return false
+ }
+ for _, rr := range wkr.running {
+ if !rr.givenup {
+ return false
+ }
+ }
+ for _, rr := range wkr.starting {
+ if !rr.givenup {
+ return false
+ }
+ }
+ // draining, and all remaining runners are just trying
+ // to force-kill their crunch-run procs
+ return true
+ default:
return false
}
+}
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+ if !wkr.eligibleForShutdown() {
+ return false
+ }
wkr.logger.WithFields(logrus.Fields{
"State": wkr.state,
- "IdleDuration": stats.Duration(age),
+ "IdleDuration": stats.Duration(time.Since(wkr.busy)),
"IdleBehavior": wkr.idleBehavior,
- }).Info("shutdown idle worker")
+ }).Info("shutdown worker")
wkr.shutdown()
return true
}
}()
}
}
+
+func (wkr *worker) Close() {
+ // This might take time, so do it after unlocking mtx.
+ defer wkr.executor.Close()
+
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ for uuid, rr := range wkr.running {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+ for uuid, rr := range wkr.starting {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+ alive := map[string]bool{}
+ for _, uuid := range ctrUUIDs {
+ alive[uuid] = true
+ if _, ok := wkr.running[uuid]; ok {
+ // unchanged
+ } else if rr, ok := wkr.starting[uuid]; ok {
+ wkr.running[uuid] = rr
+ delete(wkr.starting, uuid)
+ changed = true
+ } else {
+ // We didn't start it -- it must have been
+ // started by a previous dispatcher process.
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+ wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+ changed = true
+ }
+ }
+ for uuid := range wkr.running {
+ if !alive[uuid] {
+ wkr.closeRunner(uuid)
+ changed = true
+ }
+ }
+ return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+ rr := wkr.running[uuid]
+ if rr == nil {
+ return
+ }
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+ delete(wkr.running, uuid)
+ rr.Close()
+
+ now := time.Now()
+ wkr.updated = now
+ wkr.wp.exited[uuid] = now
+ if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+ wkr.state = StateIdle
+ }
+}
busy: ctime,
probed: ctime,
updated: ctime,
+ running: map[string]*remoteRunner{},
+ starting: map[string]*remoteRunner{},
+ probing: make(chan struct{}, 1),
}
if trial.running > 0 {
- wkr.running = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+ uuid := "zzzzz-dz642-abcdefghijklmno"
+ wkr.running = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
}
if trial.starting > 0 {
- wkr.starting = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+ uuid := "zzzzz-dz642-bcdefghijklmnop"
+ wkr.starting = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
}
wkr.probeAndUpdate()
c.Check(wkr.state, check.Equals, trial.expectState)
"fmt"
"io"
"net/http"
+ "net/url"
"os"
"git.curoverse.com/arvados.git/lib/cmd"
CheckHealth() error
}
-type NewHandlerFunc func(context.Context, *arvados.Cluster, *arvados.NodeProfile) Handler
+type NewHandlerFunc func(_ context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, token string) Handler
type command struct {
newHandler NewHandlerFunc
svcName arvados.ServiceName
+ ctx context.Context // enables tests to shutdown service; no public API yet
}
// Command returns a cmd.Handler that loads site config, calls
return &command{
newHandler: newHandler,
svcName: svcName,
+ ctx: context.Background(),
}
}
log = ctxlog.New(stderr, cluster.Logging.Format, cluster.Logging.Level).WithFields(logrus.Fields{
"PID": os.Getpid(),
})
- ctx := ctxlog.Context(context.Background(), log)
+ ctx := ctxlog.Context(c.ctx, log)
+
profileName := *nodeProfile
if profileName == "" {
profileName = os.Getenv("ARVADOS_NODE_PROFILE")
err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
return 1
}
- handler := c.newHandler(ctx, cluster, profile)
+
+ if cluster.SystemRootToken == "" {
+ log.Warn("SystemRootToken missing from cluster config, falling back to ARVADOS_API_TOKEN environment variable")
+ cluster.SystemRootToken = os.Getenv("ARVADOS_API_TOKEN")
+ }
+ if cluster.Services.Controller.ExternalURL.Host == "" {
+ log.Warn("Services.Controller.ExternalURL missing from cluster config, falling back to ARVADOS_API_HOST(_INSECURE) environment variables")
+ u, err := url.Parse("https://" + os.Getenv("ARVADOS_API_HOST"))
+ if err != nil {
+ err = fmt.Errorf("ARVADOS_API_HOST: %s", err)
+ return 1
+ }
+ cluster.Services.Controller.ExternalURL = arvados.URL(*u)
+ if i := os.Getenv("ARVADOS_API_HOST_INSECURE"); i != "" && i != "0" {
+ cluster.TLS.Insecure = true
+ }
+ }
+
+ handler := c.newHandler(ctx, cluster, profile, cluster.SystemRootToken)
if err = handler.CheckHealth(); err != nil {
return 1
}
if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.WithError(err).Errorf("error notifying init daemon")
}
+ go func() {
+ <-ctx.Done()
+ srv.Close()
+ }()
err = srv.Wait()
if err != nil {
return 1
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+// package service provides a cmd.Handler that brings up a system service.
+package service
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "testing"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (*Suite) TestCommand(c *check.C) {
+ cf, err := ioutil.TempFile("", "cmd_test.")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(cf.Name())
+ defer cf.Close()
+ fmt.Fprintf(cf, "Clusters:\n zzzzz:\n SystemRootToken: abcde\n NodeProfiles: {\"*\": {\"arvados-controller\": {Listen: \":1234\"}}}")
+
+ healthCheck := make(chan bool, 1)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, token string) Handler {
+ c.Check(ctx.Value("foo"), check.Equals, "bar")
+ c.Check(token, check.Equals, "abcde")
+ return &testHandler{ctx: ctx, healthCheck: healthCheck}
+ })
+ cmd.(*command).ctx = context.WithValue(ctx, "foo", "bar")
+
+ done := make(chan bool)
+ var stdin, stdout, stderr bytes.Buffer
+
+ go func() {
+ cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
+ close(done)
+ }()
+ select {
+ case <-healthCheck:
+ case <-done:
+ c.Error("command exited without health check")
+ }
+ cancel()
+ c.Check(stdout.String(), check.Equals, "")
+ c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
+}
+
+type testHandler struct {
+ ctx context.Context
+ healthCheck chan bool
+}
+
+func (th *testHandler) ServeHTTP(http.ResponseWriter, *http.Request) {}
+func (th *testHandler) CheckHealth() error {
+ ctxlog.FromContext(th.ctx).Info("CheckHealth called")
+ select {
+ case th.healthCheck <- true:
+ default:
+ }
+ return nil
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package service
+
+import (
+ "context"
+ "net/http"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "github.com/sirupsen/logrus"
+)
+
+// ErrorHandler returns a Handler that reports itself as unhealthy and
+// responds 500 to all requests. ErrorHandler itself logs the given
+// error once, and the handler logs it again for each incoming
+// request.
+func ErrorHandler(ctx context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, err error) Handler {
+ logger := ctxlog.FromContext(ctx)
+ logger.WithError(err).Error("unhealthy service")
+ return errorHandler{err, logger}
+}
+
+type errorHandler struct {
+ err error
+ logger logrus.FieldLogger
+}
+
+func (eh errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ eh.logger.WithError(eh.err).Error("unhealthy service")
+ http.Error(w, "", http.StatusInternalServerError)
+}
+
+func (eh errorHandler) CheckHealth() error {
+ return eh.err
+}
s.add_runtime_dependency 'arvados', '~> 1.3.0', '>= 1.3.0'
# Our google-api-client dependency used to be < 0.9, but that could be
# satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
- s.add_runtime_dependency 'cure-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
- s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5'
+ s.add_runtime_dependency 'arvados-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
+ s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5.1'
s.add_runtime_dependency 'json', '>= 1.7.7', '<3'
s.add_runtime_dependency 'optimist', '~> 3.0'
s.add_runtime_dependency 'andand', '~> 1.3', '>= 1.3.3'
logger.exception("Error creating the Arvados CWL Executor")
return 1
- # Note that unless in debug mode, some stack traces related to user
+ # Note that unless in debug mode, some stack traces related to user
# workflow errors may be suppressed. See ArvadosJob.done().
if arvargs.debug:
logger.setLevel(logging.DEBUG)
"kind": "collection",
"portable_data_hash": pdh
}
+ if pdh in self.pathmapper.pdh_to_uuid:
+ mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
if len(sp) == 2:
if tp == "Directory":
path = sp[1]
else:
processStatus = "permanentFail"
- if processStatus == "permanentFail":
- logc = arvados.collection.CollectionReader(container["log"],
+ if processStatus == "permanentFail" and record["log_uuid"]:
+ logc = arvados.collection.CollectionReader(record["log_uuid"],
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
if container["output"]:
outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
except WorkflowException as e:
- # Only include a stack trace if in debug mode.
- # A stack trace may obfuscate more useful output about the workflow.
+ # Only include a stack trace if in debug mode.
+ # A stack trace may obfuscate more useful output about the workflow.
logger.error("%s unable to collect output from %s:\n%s",
self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
)
finally:
self.updatingRuntimeStatus = False
-
+
class ArvCwlExecutor(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
with final.open("cwl.output.json", "w") as f:
res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
- f.write(res)
+ f.write(res)
final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
del self.collections[pdh]
self.total -= v[1]
- def get(self, pdh):
+ def get(self, locator):
with self.lock:
- if pdh not in self.collections:
- m = pdh_size.match(pdh)
+ if locator not in self.collections:
+ m = pdh_size.match(locator)
if m:
self.cap_cache(int(m.group(2)) * 128)
- logger.debug("Creating collection reader for %s", pdh)
- cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
- keep_client=self.keep_client,
- num_retries=self.num_retries)
+ logger.debug("Creating collection reader for %s", locator)
+ try:
+ cr = arvados.collection.CollectionReader(locator, api_client=self.api_client,
+ keep_client=self.keep_client,
+ num_retries=self.num_retries)
+ except arvados.errors.ApiError as ap:
+ raise IOError(errno.ENOENT, "Could not access collection '%s': %s" % (locator, str(ap._get_reason())))
sz = len(cr.manifest_text()) * 128
- self.collections[pdh] = (cr, sz)
+ self.collections[locator] = (cr, sz)
self.total += sz
else:
- cr, sz = self.collections[pdh]
+ cr, sz = self.collections[locator]
# bump it to the back
- del self.collections[pdh]
- self.collections[pdh] = (cr, sz)
+ del self.collections[locator]
+ self.collections[locator] = (cr, sz)
return cr
def get_collection(self, path):
sp = path.split("/", 1)
p = sp[0]
- if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
- pdh = p[5:]
- return (self.collection_cache.get(pdh), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
+ if p.startswith("keep:") and (arvados.util.keep_locator_pattern.match(p[5:]) or
+ arvados.util.collection_uuid_pattern.match(p[5:])):
+ locator = p[5:]
+ return (self.collection_cache.get(locator), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
else:
return (None, path)
baseparts = basesp.path.split("/")
urlparts = urlsp.path.split("/") if urlsp.path else []
- pdh = baseparts.pop(0)
+ locator = baseparts.pop(0)
- if basesp.scheme == "keep" and not arvados.util.keep_locator_pattern.match(pdh):
+ if (basesp.scheme == "keep" and
+ (not arvados.util.keep_locator_pattern.match(locator)) and
+ (not arvados.util.collection_uuid_pattern.match(locator))):
raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
if urlsp.path.startswith("/"):
if baseparts and urlsp.path:
baseparts.pop()
- path = "/".join([pdh] + baseparts + urlparts)
+ path = "/".join([locator] + baseparts + urlparts)
return urllib.parse.urlunsplit((basesp.scheme, "", path, "", urlsp.fragment))
return super(CollectionFetcher, self).urljoin(base_url, url)
self.name = name
self.referenced_files = [r["location"] for r in referenced_files]
self.single_collection = single_collection
+ self.pdh_to_uuid = {}
super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
def visit(self, srcobj, uploadfiles):
if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+ if arvados_cwl.util.collectionUUID in srcobj:
+ self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
debug = logger.isEnabledFor(logging.DEBUG)
import os
import sys
+import re
import urllib.parse
from functools import partial
import logging
from cwltool.utils import aslist
from cwltool.builder import substitute
from cwltool.pack import pack
+import schema_salad.validate as validate
import arvados.collection
+from .util import collectionUUID
import ruamel.yaml as yaml
import arvados_cwl.arvdocker
if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
setSecondary(t, job_order[shortname(t["id"])], discovered)
+collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
+collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
def upload_dependencies(arvrunner, name, document_loader,
workflowobj, uri, loadref_run,
loadref, urljoin=document_loader.fetcher.urljoin)
sc = []
- def only_real(obj):
- # Only interested in local files than need to be uploaded,
- # don't include file literals, keep references, etc.
- sp = obj.get("location", "").split(":")
- if len(sp) > 1 and sp[0] in ("file", "http", "https"):
+ uuids = {}
+
+ def collect_uuids(obj):
+ loc = obj.get("location", "")
+ sp = loc.split(":")
+ if sp[0] == "keep":
+ # Collect collection uuids that need to be resolved to
+ # portable data hashes
+ gp = collection_uuid_pattern.match(loc)
+ if gp:
+ uuids[gp.groups()[0]] = obj
+ if collectionUUID in obj:
+ uuids[obj[collectionUUID]] = obj
+
+ def collect_uploads(obj):
+ loc = obj.get("location", "")
+ sp = loc.split(":")
+ if len(sp) < 1:
+ return
+ if sp[0] in ("file", "http", "https"):
+ # Record local files than need to be uploaded,
+ # don't include file literals, keep references, etc.
sc.append(obj)
+ collect_uuids(obj)
- visit_class(sc_result, ("File", "Directory"), only_real)
+ visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+ visit_class(sc_result, ("File", "Directory"), collect_uploads)
+
+ # Resolve any collection uuids we found to portable data hashes
+ # and assign them to uuid_map
+ uuid_map = {}
+ fetch_uuids = list(uuids.keys())
+ while fetch_uuids:
+ # For a large number of fetch_uuids, API server may limit
+ # response size, so keep fetching from API server has nothing
+ # more to give us.
+ lookups = arvrunner.api.collections().list(
+ filters=[["uuid", "in", fetch_uuids]],
+ count="none",
+ select=["uuid", "portable_data_hash"]).execute(
+ num_retries=arvrunner.num_retries)
+
+ if not lookups["items"]:
+ break
+
+ for l in lookups["items"]:
+ uuid_map[l["uuid"]] = l["portable_data_hash"]
+
+ fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
normalizeFilesDirs(sc)
single_collection=True)
def setloc(p):
- if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+ loc = p.get("location")
+ if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
p["location"] = mapper.mapper(p["location"]).resolved
+ return
+
+ if not loc:
+ return
+
+ if collectionUUID in p:
+ uuid = p[collectionUUID]
+ if uuid not in uuid_map:
+ raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ gp = collection_pdh_pattern.match(loc)
+ if gp and uuid_map[uuid] != gp.groups()[0]:
+ # This file entry has both collectionUUID and a PDH
+ # location. If the PDH doesn't match the one returned
+ # the API server, raise an error.
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Expected collection uuid %s to be %s but API server reported %s" % (
+ uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
+ gp = collection_uuid_pattern.match(loc)
+ if not gp:
+ return
+ uuid = gp.groups()[0]
+ if uuid not in uuid_map:
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+ p[collectionUUID] = uuid
visit_class(workflowobj, ("File", "Directory"), setloc)
visit_class(discovered, ("File", "Directory"), setloc)
import datetime
from arvados.errors import ApiError
+collectionUUID = "http://arvados.org/cwl#collectionUUID"
+
def get_intermediate_collection_info(workflow_step_name, current_container, intermediate_output_ttl):
if workflow_step_name:
name = "Intermediate collection for step %s" % (workflow_step_name)
if logger:
logger.info("Getting current container: %s", e)
raise e
-
+
return current_container
--- /dev/null
+{
+ "x": {
+ "class": "File",
+ "path": "input/blorp.txt"
+ },
+ "y": {
+ "class": "Directory",
+ "location": "keep:99999999999999999999999999999998+99",
+ "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "listing": [{
+ "class": "File",
+ "location": "keep:99999999999999999999999999999997+99/file1.txt",
+ "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
+ }]
+ },
+ "z": {
+ "class": "Directory",
+ "basename": "anonymous",
+ "listing": [{
+ "basename": "renamed.txt",
+ "class": "File",
+ "location": "keep:99999999999999999999999999999998+99/file1.txt"
+ }]
+ }
+}
--- /dev/null
+{
+ "x": {
+ "class": "File",
+ "path": "input/blorp.txt"
+ },
+ "y": {
+ "class": "Directory",
+ "location": "keep:99999999999999999999999999999998+99",
+ "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "listing": [{
+ "class": "File",
+ "location": "keep:99999999999999999999999999999998+99/file1.txt",
+ "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
+ }]
+ },
+ "z": {
+ "class": "Directory",
+ "basename": "anonymous",
+ "listing": [{
+ "basename": "renamed.txt",
+ "class": "File",
+ "location": "keep:99999999999999999999999999999998+99/file1.txt",
+ "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
+ }]
+ }
+}
--- /dev/null
+{
+ "x": {
+ "class": "File",
+ "path": "input/blorp.txt"
+ },
+ "y": {
+ "class": "Directory",
+ "location": "keep:zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "listing": [{
+ "class": "File",
+ "location": "keep:zzzzz-4zz18-zzzzzzzzzzzzzzz/file1.txt"
+ }]
+ },
+ "z": {
+ "class": "Directory",
+ "basename": "anonymous",
+ "listing": [{
+ "basename": "renamed.txt",
+ "class": "File",
+ "location": "keep:zzzzz-4zz18-zzzzzzzzzzzzzzz/file1.txt"
+ }]
+ }
+}
return loadingContext, runtimeContext
- # Helper function to set up the ArvCwlExecutor to use the containers api
+ # Helper function to set up the ArvCwlExecutor to use the containers api
# and test that the RuntimeStatusLoggingHandler is set up correctly
def setup_and_test_container_executor_and_logging(self, gcc_mock) :
api = mock.MagicMock()
handlerClasses = [h.__class__ for h in root_logger.handlers]
self.assertTrue(arvados_cwl.RuntimeStatusLoggingHandler in handlerClasses)
return runner
-
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
# get_current_container is invoked when we call runtime_status_update
# so try and log again!
gcc_mock.side_effect = lambda *args: root_logger.error("Second Error")
- try:
+ try:
root_logger.error("First Error")
- except RuntimeError:
+ except RuntimeError:
self.fail("RuntimeStatusLoggingHandler should not be called recursively")
@mock.patch("arvados_cwl.ArvCwlExecutor.runtime_status_update")
@mock.patch("arvados.collection.Collection")
def test_child_failure(self, col, reader, gcc_mock, rts_mock):
runner = self.setup_and_test_container_executor_and_logging(gcc_mock)
-
+
gcc_mock.return_value = {"uuid" : "zzzzz-dz642-zzzzzzzzzzzzzzz"}
self.assertTrue(gcc_mock.called)
"p1": {
"class": "Directory",
"location": "keep:99999999999999999999999999999994+44",
+ "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
"listing": [
{
"class": "File",
'mounts': {
"/keep/99999999999999999999999999999994+44": {
"kind": "collection",
- "portable_data_hash": "99999999999999999999999999999994+44"
+ "portable_data_hash": "99999999999999999999999999999994+44",
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
},
'/tmp': {'kind': 'tmp',
"capacity": 1073741824 },
import sys
import unittest
-from io import BytesIO, StringIO
+from io import BytesIO
+
+# StringIO.StringIO and io.StringIO have different behavior write() is
+# called with both python2 (byte) strings and unicode strings
+# (specifically there's some logging in cwltool that causes trouble).
+# This isn't a problem on python3 because all string are unicode.
+if sys.version_info[0] < 3:
+ from StringIO import StringIO
+else:
+ from io import StringIO
import arvados
import arvados.collection
"portable_data_hash": "99999999999999999999999999999998+99",
"manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
},
+ "99999999999999999999999999999997+99": {
+ "uuid": "",
+ "portable_data_hash": "99999999999999999999999999999997+99",
+ "manifest_text": ". 99999999999999999999999999999997+99 0:0:file1.txt"
+ },
"99999999999999999999999999999994+99": {
"uuid": "",
"portable_data_hash": "99999999999999999999999999999994+99",
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
self.assertEqual(exited, 1)
+ @mock.patch("arvados.collection.CollectionReader")
+ @stubs
+ def test_submit_uuid_inputs(self, stubs, collectionReader):
+ collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "file1.txt")
+ def list_side_effect(**kwargs):
+ m = mock.MagicMock()
+ if "count" in kwargs:
+ m.execute.return_value = {"items": [
+ {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz", "portable_data_hash": "99999999999999999999999999999998+99"}
+ ]}
+ else:
+ m.execute.return_value = {"items": []}
+ return m
+ stubs.api.collections().list.side_effect = list_side_effect
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job_with_uuids.json"],
+ stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['y']['basename'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+ expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['y']['http://arvados.org/cwl#collectionUUID'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+ expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['z']['listing'][0]['http://arvados.org/cwl#collectionUUID'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+
+ stubs.api.collections().list.assert_has_calls([
+ mock.call(count='none',
+ filters=[['uuid', 'in', ['zzzzz-4zz18-zzzzzzzzzzzzzzz']]],
+ select=['uuid', 'portable_data_hash'])])
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(stubs.capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+ self.assertEqual(exited, 0)
+
+ @stubs
+ def test_submit_mismatched_uuid_inputs(self, stubs):
+ def list_side_effect(**kwargs):
+ m = mock.MagicMock()
+ if "count" in kwargs:
+ m.execute.return_value = {"items": [
+ {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz", "portable_data_hash": "99999999999999999999999999999997+99"}
+ ]}
+ else:
+ m.execute.return_value = {"items": []}
+ return m
+ stubs.api.collections().list.side_effect = list_side_effect
+
+ for infile in ("tests/submit_test_job_with_mismatched_uuids.json", "tests/submit_test_job_with_inconsistent_uuids.json"):
+ capture_stderr = StringIO()
+ cwltool_logger = logging.getLogger('cwltool')
+ stderr_logger = logging.StreamHandler(capture_stderr)
+ cwltool_logger.addHandler(stderr_logger)
+
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "tests/wf/submit_wf.cwl", infile],
+ stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+ self.assertEqual(exited, 1)
+ self.assertRegexpMatches(
+ capture_stderr.getvalue(),
+ r"Expected collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz to be 99999999999999999999999999999998\+99 but API server reported 99999999999999999999999999999997\+99")
+ finally:
+ cwltool_logger.removeHandler(stderr_logger)
+
+ @mock.patch("arvados.collection.CollectionReader")
+ @stubs
+ def test_submit_unknown_uuid_inputs(self, stubs, collectionReader):
+ collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "file1.txt")
+ capture_stderr = StringIO()
+
+ cwltool_logger = logging.getLogger('cwltool')
+ stderr_logger = logging.StreamHandler(capture_stderr)
+ cwltool_logger.addHandler(stderr_logger)
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job_with_uuids.json"],
+ stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+ try:
+ self.assertEqual(exited, 1)
+ self.assertRegexpMatches(
+ capture_stderr.getvalue(),
+ r"Collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz not found")
+ finally:
+ cwltool_logger.removeHandler(stderr_logger)
+
class TestCreateTemplate(unittest.TestCase):
existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
@stubs
def test_incompatible_api(self, stubs):
- capture_stderr = io.StringIO()
+ capture_stderr = StringIO()
acr_logger = logging.getLogger('arvados.cwl-runner')
stderr_logger = logging.StreamHandler(capture_stderr)
acr_logger.addHandler(stderr_logger)
- exited = arvados_cwl.main(
- ["--update-workflow", self.existing_workflow_uuid,
- "--api=jobs",
- "--debug",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- sys.stderr, sys.stderr, api_client=stubs.api)
- self.assertEqual(exited, 1)
- self.assertRegexpMatches(
- capture_stderr.getvalue(),
- "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
- acr_logger.removeHandler(stderr_logger)
+ try:
+ exited = arvados_cwl.main(
+ ["--update-workflow", self.existing_workflow_uuid,
+ "--api=jobs",
+ "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ sys.stderr, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 1)
+ self.assertRegexpMatches(
+ capture_stderr.getvalue(),
+ "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
+ finally:
+ acr_logger.removeHandler(stderr_logger)
@stubs
def test_update(self, stubs):
var DefaultSecureClient = &http.Client{
Timeout: 5 * time.Minute}
+// NewClientFromConfig creates a new Client that uses the endpoints in
+// the given cluster.
+//
+// AuthToken is left empty for the caller to populate.
+func NewClientFromConfig(cluster *Cluster) (*Client, error) {
+ ctrlURL := cluster.Services.Controller.ExternalURL
+ if ctrlURL.Host == "" {
+ return nil, fmt.Errorf("no host in config Services.Controller.ExternalURL: %v", ctrlURL)
+ }
+ return &Client{
+ APIHost: fmt.Sprintf("%v", ctrlURL),
+ Insecure: cluster.TLS.Insecure,
+ }, nil
+}
+
// NewClientFromEnv creates a new Client that uses the default HTTP
// client with the API endpoint and credentials given by the
// ARVADOS_API_* environment variables.
"encoding/json"
"errors"
"fmt"
+ "net/url"
"os"
"git.curoverse.com/arvados.git/sdk/go/config"
type Cluster struct {
ClusterID string `json:"-"`
ManagementToken string
+ SystemRootToken string
+ Services Services
NodeProfiles map[string]NodeProfile
InstanceTypes InstanceTypeMap
CloudVMs CloudVMs
PostgreSQL PostgreSQL
RequestLimits RequestLimits
Logging Logging
+ TLS TLS
}
+type Services struct {
+ Controller Service
+ DispatchCloud Service
+ Health Service
+ Keepbalance Service
+ Keepproxy Service
+ Keepstore Service
+ Keepweb Service
+ Nodemanager Service
+ RailsAPI Service
+ Websocket Service
+ Workbench Service
+}
+
+type Service struct {
+ InternalURLs map[URL]ServiceInstance
+ ExternalURL URL
+}
+
+// URL is a url.URL that is also usable as a JSON key/value.
+type URL url.URL
+
+// UnmarshalText implements encoding.TextUnmarshaler so URL can be
+// used as a JSON key/value.
+func (su *URL) UnmarshalText(text []byte) error {
+ u, err := url.Parse(string(text))
+ if err == nil {
+ *su = URL(*u)
+ }
+ return err
+}
+
+type ServiceInstance struct{}
+
type Logging struct {
Level string
Format string
// Maximum total worker probes per second
MaxProbesPerSecond int
+
+ // Time before repeating SIGTERM when killing a container
+ TimeoutSignal Duration
+
+ // Time to give up on SIGTERM and write off the worker
+ TimeoutTERM Duration
}
type CloudVMs struct {
// Time after shutdown to retry shutdown
TimeoutShutdown Duration
+ // Maximum create/destroy-instance operations per second
+ MaxCloudOpsPerSecond int
+
ImageID string
Driver string
TLS bool
Insecure bool
}
+
+type TLS struct {
+ Certificate string
+ Key string
+ Insecure bool
+}
import (
"encoding/json"
+ "errors"
"fmt"
"log"
"os"
}
svcListCacheMtx.Unlock()
- return kc.loadKeepServers(<-cacheEnt.latest)
+ select {
+ case <-time.After(time.Minute):
+ return errors.New("timed out while getting initial list of keep services")
+ case sl := <-cacheEnt.latest:
+ return kc.loadKeepServers(sl)
+ }
}
func (kc *KeepClient) RefreshServiceDiscovery() {
s.add_dependency('andand', '~> 1.3', '>= 1.3.3')
# Our google-api-client dependency used to be < 0.9, but that could be
# satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
- s.add_dependency('cure-google-api-client', '>= 0.7', '< 0.8.9')
+ s.add_dependency('arvados-google-api-client', '>= 0.7', '< 0.8.9')
# work around undeclared dependency on i18n in some activesupport 3.x.x:
s.add_dependency('i18n', '~> 0')
s.add_dependency('json', '>= 1.7.7', '<3')
assert_includes coll_uuids, collections(:foo_collection_in_aproject).uuid
assert_not_includes coll_uuids, collections(:expired_collection).uuid
end
+end
+
+class NonTransactionalGroupsTest < ActionDispatch::IntegrationTest
+ # Transactional tests are disabled to be able to test the concurrent
+ # asynchronous permissions update feature.
+ # This is needed because nested transactions share the connection pool, so
+ # one thread is locked while trying to talk to the database, until the other
+ # one finishes.
+ self.use_transactional_fixtures = false
+
+ teardown do
+ # Explicitly reset the database after each test.
+ post '/database/reset', {}, auth(:admin)
+ assert_response :success
+ end
test "create request with async=true defers permissions update" do
- Rails.configuration.async_permissions_update_interval = 1 # seconds
+ Rails.configuration.async_permissions_update_interval = 1 # second
name = "Random group #{rand(1000)}"
assert_equal nil, Group.find_by_name(name)
+
+ # Trigger the asynchronous permission update by using async=true parameter.
post "/arvados/v1/groups", {
group: {
name: name
async: true
}, auth(:active)
assert_response 202
- g = Group.find_by_name(name)
- assert_not_nil g
+
+ # The group exists on the database, but it's not accessible yet.
+ assert_not_nil Group.find_by_name(name)
get "/arvados/v1/groups", {
filters: [["name", "=", name]].to_json,
limit: 10
assert_response 200
assert_equal 0, json_response['items_available']
- # Unblock the thread doing the permissions update
- ActiveRecord::Base.clear_active_connections!
-
- sleep(3)
+ # Wait a bit and try again.
+ sleep(1)
get "/arvados/v1/groups", {
filters: [["name", "=", name]].to_json,
limit: 10
lockdir = "/var/lock"
lockprefix = "crunch-run-"
locksuffix = ".lock"
+ brokenfile = "crunch-run-broken"
)
// procinfo is saved in each process's lockfile.
proc, err := os.FindProcess(pi.PID)
if err != nil {
+ // FindProcess should have succeeded, even if the
+ // process does not exist.
return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
}
+ // Send the requested signal once, then send signal 0 a few
+ // times. When proc.Signal() returns an error (process no
+ // longer exists), return success. If that doesn't happen
+ // within 1 second, return an error.
err = proc.Signal(signal)
for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
err = proc.Signal(syscall.Signal(0))
}
if err == nil {
+ // Reached deadline without a proc.Signal() error.
return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
}
fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
if info.IsDir() && path != walkdir {
return filepath.SkipDir
}
- if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
+ if name := info.Name(); name == brokenfile {
+ fmt.Fprintln(stdout, "broken")
+ return nil
+ } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
return nil
}
if info.Size() == 0 {
func (runner *ContainerRunner) runBrokenNodeHook() {
if *brokenNodeHook == "" {
- runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+ path := filepath.Join(lockdir, brokenfile)
+ runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
+ f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
+ if err != nil {
+ runner.CrunchLog.Printf("Error writing %s: %s", path, err)
+ return
+ }
+ f.Close()
} else {
runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
// run killme script
c.Check(api.CalledWith("container.state", "Queued"), NotNil)
c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
- c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
}
func (s *TestSuite) TestFullBrokenDocker3(c *C) {
import collections
import functools
import arvados.keep
+from prometheus_client import Summary
import Queue
"""
+ fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
+ read_time = fuse_time.labels(op='read')
+ write_time = fuse_time.labels(op='write')
+ destroy_time = fuse_time.labels(op='destroy')
+ on_event_time = fuse_time.labels(op='on_event')
+ getattr_time = fuse_time.labels(op='getattr')
+ setattr_time = fuse_time.labels(op='setattr')
+ lookup_time = fuse_time.labels(op='lookup')
+ forget_time = fuse_time.labels(op='forget')
+ open_time = fuse_time.labels(op='open')
+ release_time = fuse_time.labels(op='release')
+ opendir_time = fuse_time.labels(op='opendir')
+ readdir_time = fuse_time.labels(op='readdir')
+ statfs_time = fuse_time.labels(op='statfs')
+ create_time = fuse_time.labels(op='create')
+ mkdir_time = fuse_time.labels(op='mkdir')
+ unlink_time = fuse_time.labels(op='unlink')
+ rmdir_time = fuse_time.labels(op='rmdir')
+ rename_time = fuse_time.labels(op='rename')
+ flush_time = fuse_time.labels(op='flush')
+
def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
super(Operations, self).__init__()
# initializing to continue
self.initlock.set()
+ def metric_samples(self):
+ return self.fuse_time.collect()[0].samples
+
+ def metric_op_names(self):
+ ops = []
+ for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
+ if cur_op not in ops:
+ ops.append(cur_op)
+ return ops
+
+ def metric_value(self, opname, metric):
+ op_value = [sample.value for sample in self.metric_samples()
+ if sample.name == metric and sample.labels['op'] == opname]
+ return op_value[0] if len(op_value) == 1 else None
+
+ def metric_sum_func(self, opname):
+ return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
+
+ def metric_count_func(self, opname):
+ return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
+
+ @destroy_time.time()
@catch_exceptions
def destroy(self):
self._shutdown_started.set()
[["event_type", "in", ["create", "update", "delete"]]],
self.on_event)
+ @on_event_time.time()
@catch_exceptions
def on_event(self, ev):
if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
self.inodes.inode_cache.find_by_uuid(newowner)):
parent.child_event(ev)
+ @getattr_time.time()
@catch_exceptions
def getattr(self, inode, ctx=None):
if inode not in self.inodes:
return entry
+ @setattr_time.time()
@catch_exceptions
def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
entry = self.getattr(inode)
return entry
+ @lookup_time.time()
@catch_exceptions
def lookup(self, parent_inode, name, ctx=None):
name = unicode(name, self.inodes.encoding)
parent_inode, name)
raise llfuse.FUSEError(errno.ENOENT)
+ @forget_time.time()
@catch_exceptions
def forget(self, inodes):
if self._shutdown_started.is_set():
if ent.dec_ref(nlookup) == 0 and ent.dead:
self.inodes.del_entry(ent)
+ @open_time.time()
@catch_exceptions
def open(self, inode, flags, ctx=None):
if inode in self.inodes:
return fh
+ @read_time.time()
@catch_exceptions
def read(self, fh, off, size):
_logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
self.read_counter.add(len(r))
return r
+ @write_time.time()
@catch_exceptions
def write(self, fh, off, buf):
_logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
self.write_counter.add(w)
return w
+ @release_time.time()
@catch_exceptions
def release(self, fh):
if fh in self._filehandles:
def releasedir(self, fh):
self.release(fh)
+ @opendir_time.time()
@catch_exceptions
def opendir(self, inode, ctx=None):
_logger.debug("arv-mount opendir: inode %i", inode)
self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
return fh
+ @readdir_time.time()
@catch_exceptions
def readdir(self, fh, off):
_logger.debug("arv-mount readdir: fh %i off %i", fh, off)
yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
e += 1
+ @statfs_time.time()
@catch_exceptions
def statfs(self, ctx=None):
st = llfuse.StatvfsData()
return p
+ @create_time.time()
@catch_exceptions
def create(self, inode_parent, name, mode, flags, ctx=None):
_logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
f.inc_ref()
return (fh, self.getattr(f.inode))
+ @mkdir_time.time()
@catch_exceptions
def mkdir(self, inode_parent, name, mode, ctx=None):
_logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
d.inc_ref()
return self.getattr(d.inode)
+ @unlink_time.time()
@catch_exceptions
def unlink(self, inode_parent, name, ctx=None):
_logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.unlink(name)
+ @rmdir_time.time()
@catch_exceptions
def rmdir(self, inode_parent, name, ctx=None):
_logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.rmdir(name)
+ @rename_time.time()
@catch_exceptions
def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
_logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
dest = self._check_writable(inode_parent_new)
dest.rename(name_old, name_new, src)
+ @flush_time.time()
@catch_exceptions
def flush(self, fh):
if fh in self._filehandles:
import sys
import time
+from collections import namedtuple
-class Stat(object):
- def __init__(self, prefix, interval,
- egr_name, ing_name,
- egr_func, ing_func):
+Stat = namedtuple("Stat", ['name', 'get'])
+
+class StatWriter(object):
+ def __init__(self, prefix, interval, stats):
self.prefix = prefix
self.interval = interval
- self.egr_name = egr_name
- self.ing_name = ing_name
- self.egress = egr_func
- self.ingress = ing_func
- self.egr_prev = self.egress()
- self.ing_prev = self.ingress()
-
- def update(self):
- egr = self.egress()
- ing = self.ingress()
+ self.stats = stats
+ self.previous_stats = []
+ self.update_previous_stats()
- delta = " -- interval %.4f seconds %d %s %d %s" % (self.interval,
- egr - self.egr_prev,
- self.egr_name,
- ing - self.ing_prev,
- self.ing_name)
+ def update_previous_stats(self):
+ self.previous_stats = [stat.get() for stat in self.stats]
- sys.stderr.write("crunchstat: %s %d %s %d %s%s\n" % (self.prefix,
- egr,
- self.egr_name,
- ing,
- self.ing_name,
- delta))
+ def update(self):
+ def append_by_type(string, name, value):
+ if type(value) is float:
+ string += " %.6f %s" % (value, name)
+ else:
+ string += " %s %s" % (str(value), name)
+ return string
- self.egr_prev = egr
- self.ing_prev = ing
+ out = "crunchstat: %s" % self.prefix
+ delta = "-- interval %.4f seconds" % self.interval
+ for i, stat in enumerate(self.stats):
+ value = stat.get()
+ diff = value - self.previous_stats[i]
+ delta = append_by_type(delta, stat.name, diff)
+ out = append_by_type(out, stat.name, value)
+ sys.stderr.write("%s %s\n" % (out, delta))
+ self.update_previous_stats()
def statlogger(interval, keep, ops):
- calls = Stat("keepcalls", interval, "put", "get",
- keep.put_counter.get,
- keep.get_counter.get)
- net = Stat("net:keep0", interval, "tx", "rx",
- keep.upload_counter.get,
- keep.download_counter.get)
- cache = Stat("keepcache", interval, "hit", "miss",
- keep.hits_counter.get,
- keep.misses_counter.get)
- fuseops = Stat("fuseops", interval,"write", "read",
- ops.write_ops_counter.get,
- ops.read_ops_counter.get)
- blk = Stat("blkio:0:0", interval, "write", "read",
- ops.write_counter.get,
- ops.read_counter.get)
+ calls = StatWriter("keepcalls", interval, [
+ Stat("put", keep.put_counter.get),
+ Stat("get", keep.get_counter.get)
+ ])
+ net = StatWriter("net:keep0", interval, [
+ Stat("tx", keep.upload_counter.get),
+ Stat("rx", keep.download_counter.get)
+ ])
+ cache = StatWriter("keepcache", interval, [
+ Stat("hit", keep.hits_counter.get),
+ Stat("miss", keep.misses_counter.get)
+ ])
+ fuseops = StatWriter("fuseops", interval, [
+ Stat("write", ops.write_ops_counter.get),
+ Stat("read", ops.read_ops_counter.get)
+ ])
+ fusetimes = []
+ for cur_op in ops.metric_op_names():
+ name = "fuseop:{0}".format(cur_op)
+ fusetimes.append(StatWriter(name, interval, [
+ Stat("count", ops.metric_count_func(cur_op)),
+ Stat("time", ops.metric_sum_func(cur_op))
+ ]))
+ blk = StatWriter("blkio:0:0", interval, [
+ Stat("write", ops.write_counter.get),
+ Stat("read", ops.read_counter.get)
+ ])
while True:
time.sleep(interval)
calls.update()
net.update()
cache.update()
- fuseops.update()
blk.update()
+ fuseops.update()
+ for ftime in fusetimes:
+ ftime.update()
'llfuse >=1.2, <1.3.4',
'python-daemon',
'ciso8601 >= 2.0.0',
- 'setuptools'
+ 'setuptools',
+ "prometheus_client"
],
extras_require={
':python_version<"3"': ['pytz'],
fi
}
+docker_run_dev() {
+ docker run \
+ "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
+ "--volume=$SSO_ROOT:/usr/src/sso:rw" \
+ "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
+ "--volume=$WORKBENCH2_ROOT:/usr/src/workbench2:rw" \
+ "--volume=$PG_DATA:/var/lib/postgresql:rw" \
+ "--volume=$VAR_DATA:/var/lib/arvados:rw" \
+ "--volume=$PASSENGER:/var/lib/passenger:rw" \
+ "--volume=$GEMS:/var/lib/gems:rw" \
+ "--volume=$PIPCACHE:/var/lib/pip:rw" \
+ "--volume=$NPMCACHE:/var/lib/npm:rw" \
+ "--volume=$GOSTUFF:/var/lib/gopath:rw" \
+ "--volume=$RLIBS:/var/lib/Rlibs:rw" \
+ "$@"
+}
+
run() {
CONFIG=$1
TAG=$2
mkdir -p $VAR_DATA/test
if test "$need_setup" = 1 ; then
- docker run \
+ docker_run_dev \
--detach \
--name=$ARVBOX_CONTAINER \
--privileged \
- "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
- "--volume=$SSO_ROOT:/usr/src/sso:rw" \
- "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
- "--volume=$WORKBENCH2_ROOT:/usr/src/workbench2:rw" \
- "--volume=$PG_DATA:/var/lib/postgresql:rw" \
- "--volume=$VAR_DATA:/var/lib/arvados:rw" \
- "--volume=$PASSENGER:/var/lib/passenger:rw" \
- "--volume=$GEMS:/var/lib/gems:rw" \
- "--volume=$PIPCACHE:/var/lib/pip:rw" \
- "--volume=$NPMCACHE:/var/lib/npm:rw" \
- "--volume=$GOSTUFF:/var/lib/gopath:rw" \
- "--volume=$RLIBS:/var/lib/Rlibs:rw" \
"--env=SVDIR=/etc/test-service" \
arvados/arvbox-dev$TAG
GEM_HOME=/var/lib/gems \
"$@"
elif echo "$CONFIG" | grep 'dev$' ; then
- docker run \
+ docker_run_dev \
--detach \
--name=$ARVBOX_CONTAINER \
--privileged \
- "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
- "--volume=$SSO_ROOT:/usr/src/sso:rw" \
- "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
- "--volume=$WORKBENCH2_ROOT:/usr/src/workbench2:rw" \
- "--volume=$PG_DATA:/var/lib/postgresql:rw" \
- "--volume=$VAR_DATA:/var/lib/arvados:rw" \
- "--volume=$PASSENGER:/var/lib/passenger:rw" \
- "--volume=$GEMS:/var/lib/gems:rw" \
- "--volume=$PIPCACHE:/var/lib/pip:rw" \
- "--volume=$NPMCACHE:/var/lib/npm:rw" \
- "--volume=$GOSTUFF:/var/lib/gopath:rw" \
- "--volume=$RLIBS:/var/lib/Rlibs:rw" \
$PUBLIC \
arvados/arvbox-dev$TAG
updateconf
fi
;;
+ install-root-cert)
+ set -x
+ sudo cp $VAR_DATA/root-cert.pem /usr/local/share/ca-certificates/${ARVBOX_CONTAINER}-testing-cert.crt
+ sudo update-ca-certificates
+ ;;
+
+ devenv)
+ set -x
+ if docker ps -a --filter "status=exited" | grep -E "${ARVBOX_CONTAINER}-devenv$" -q ; then
+ docker start ${ARVBOX_CONTAINER}-devenv
+ elif ! (docker ps -a --filter "status=running" | grep -E "${ARVBOX_CONTAINER}-devenv$" -q) ; then
+ docker_run_dev \
+ --detach \
+ --name=${ARVBOX_CONTAINER}-devenv \
+ "--env=SVDIR=/etc/devenv-service" \
+ "--volume=$HOME:$HOME:rw" \
+ --volume=/tmp/.X11-unix:/tmp/.X11-unix:rw \
+ arvados/arvbox-dev$TAG
+ fi
+
+ exec docker exec --interactive --tty \
+ -e LINES=$(tput lines) \
+ -e COLUMNS=$(tput cols) \
+ -e TERM=$TERM \
+ -e "ARVBOX_HOME=$HOME" \
+ -e "DISPLAY=$DISPLAY" \
+ --workdir=$PWD \
+ ${ARVBOX_CONTAINER}-devenv \
+ /usr/local/lib/arvbox/devenv.sh "$@"
+ ;;
+
+ devenv-stop)
+ docker stop ${ARVBOX_CONTAINER}-devenv
+ ;;
+
+ devenv-reset)
+ docker stop ${ARVBOX_CONTAINER}-devenv
+ docker rm ${ARVBOX_CONTAINER}-devenv
+ ;;
+
*)
echo "Arvados-in-a-box http://arvados.org"
echo
linkchecker python3-virtualenv python-virtualenv xvfb iceweasel \
libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr \
libsecret-1-dev r-base r-cran-testthat libxml2-dev pandoc \
- python3-setuptools python3-pip && \
+ python3-setuptools python3-pip openjdk-8-jdk && \
apt-get clean
ENV RUBYVERSION_MINOR 2.3
ENV GEM_PATH /var/lib/gems
ENV PATH $PATH:/var/lib/gems/bin
-ENV GOVERSION 1.10.1
+ENV GOVERSION 1.11.5
# Install golang binary
RUN curl -f http://storage.googleapis.com/golang/go${GOVERSION}.linux-amd64.tar.gz | \
RUN pip install -U setuptools
-ENV NODEVERSION v6.11.4
+ENV NODEVERSION v8.15.1
# Install nodejs binary
RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
keep-setup.sh common.sh createusers.sh \
logger runsu.sh waitforpostgres.sh \
yml_override.py api-setup.sh \
- go-setup.sh \
+ go-setup.sh devenv.sh \
/usr/local/lib/arvbox/
ADD runit /etc/runit
RUN echo "development" > /var/lib/arvados/workbench_rails_env
RUN mkdir /etc/test-service && ln -sf /var/lib/arvbox/service/postgres /etc/test-service
+RUN mkdir /etc/devenv-service
\ No newline at end of file
export npm_config_cache=/var/lib/npm
export npm_config_cache_min=Infinity
export R_LIBS=/var/lib/Rlibs
+export HOME=$(getent passwd arvbox | cut -d: -f6)
if test -s /var/run/localip_override ; then
localip=$(cat /var/run/localip_override)
/var/lib/passenger /var/lib/gopath \
/var/lib/pip /var/lib/npm
+ if test -z "$ARVBOX_HOME" ; then
+ ARVBOX_HOME=/var/lib/arvados
+ fi
+
groupadd --gid $HOSTGID --non-unique arvbox
groupadd --gid $HOSTGID --non-unique git
- useradd --home-dir /var/lib/arvados \
+ useradd --home-dir $ARVBOX_HOME \
--uid $HOSTUID --gid $HOSTGID \
--non-unique \
--groups docker \
chown crunch:crunch -R /tmp/crunch0 /tmp/crunch1
echo "arvbox ALL=(crunch) NOPASSWD: ALL" >> /etc/sudoers
+
+ cat <<EOF > /etc/profile.d/paths.sh
+export PATH=/usr/local/bin:/usr/bin:/bin:/usr/local/go/bin:/var/lib/gems/bin:$(ls -d /usr/local/node-*)/bin
+export GEM_HOME=/var/lib/gems
+export GEM_PATH=/var/lib/gems
+export npm_config_cache=/var/lib/npm
+export npm_config_cache_min=Infinity
+export R_LIBS=/var/lib/Rlibs
+export GOPATH=/var/lib/gopath
+EOF
+
fi
if ! grep "^fuse:" /etc/group >/dev/null 2>/dev/null ; then
--- /dev/null
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+flock /var/lib/arvados/createusers.lock /usr/local/lib/arvbox/createusers.sh
+
+if [[ -n "$*" ]] ; then
+ exec su --preserve-environment arvbox -c "$*"
+else
+ exec su --login arvbox
+fi
+++ /dev/null
-/usr/local/lib/arvbox/runsu.sh
\ No newline at end of file
--- /dev/null
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+exec 2>&1
+set -ex -o pipefail
+
+. /usr/local/lib/arvbox/common.sh
+
+cat <<EOF >/var/lib/arvados/nginx.conf
+worker_processes auto;
+pid /var/lib/arvados/nginx.pid;
+
+error_log stderr;
+daemon off;
+user arvbox;
+
+events {
+ worker_connections 64;
+}
+
+http {
+ access_log off;
+ include /etc/nginx/mime.types;
+ default_type application/octet-stream;
+ server {
+ listen ${services[doc]} default_server;
+ listen [::]:${services[doc]} default_server;
+ root /usr/src/arvados/doc/.site;
+ index index.html;
+ server_name _;
+ }
+
+ server {
+ listen 80 default_server;
+ server_name _;
+ return 301 https://\$host\$request_uri;
+ }
+
+ upstream controller {
+ server localhost:${services[controller]};
+ }
+ server {
+ listen *:${services[controller-ssl]} ssl default_server;
+ server_name controller;
+ ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+ ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+ location / {
+ proxy_pass http://controller;
+ proxy_set_header Host \$http_host;
+ proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+ proxy_set_header X-Forwarded-Proto https;
+ proxy_redirect off;
+ }
+ }
+
+upstream arvados-ws {
+ server localhost:${services[websockets]};
+}
+server {
+ listen *:${services[websockets-ssl]} ssl default_server;
+ server_name websockets;
+
+ proxy_connect_timeout 90s;
+ proxy_read_timeout 300s;
+
+ ssl on;
+ ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+ ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+
+ location / {
+ proxy_pass http://arvados-ws;
+ proxy_set_header Upgrade \$http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host \$http_host;
+ proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+ }
+}
+
+ upstream workbench2 {
+ server localhost:${services[workbench2]};
+ }
+ server {
+ listen *:${services[workbench2-ssl]} ssl default_server;
+ server_name workbench2;
+ ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+ ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+ location / {
+ proxy_pass http://workbench2;
+ proxy_set_header Host \$http_host;
+ proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+ proxy_set_header X-Forwarded-Proto https;
+ proxy_redirect off;
+ }
+ location /sockjs-node {
+ proxy_pass http://workbench2;
+ proxy_set_header Upgrade \$http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host \$http_host;
+ proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+ }
+ }
+
+ upstream keep-web {
+ server localhost:${services[keep-web]};
+ }
+ server {
+ listen *:${services[keep-web-ssl]} ssl default_server;
+ server_name keep-web;
+ ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+ ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+ location / {
+ proxy_pass http://keep-web;
+ proxy_set_header Host \$http_host;
+ proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+ proxy_set_header X-Forwarded-Proto https;
+ proxy_redirect off;
+ }
+ }
+
+}
+
+EOF
+
+exec nginx -c /var/lib/arvados/nginx.conf
+++ /dev/null
-#!/bin/bash
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-exec 2>&1
-set -ex -o pipefail
-
-. /usr/local/lib/arvbox/common.sh
-
-cat <<EOF >/var/lib/arvados/nginx.conf
-worker_processes auto;
-pid /var/lib/arvados/nginx.pid;
-
-error_log stderr;
-daemon off;
-
-events {
- worker_connections 64;
-}
-
-http {
- access_log off;
- include /etc/nginx/mime.types;
- default_type application/octet-stream;
- server {
- listen ${services[doc]} default_server;
- listen [::]:${services[doc]} default_server;
- root /usr/src/arvados/doc/.site;
- index index.html;
- server_name _;
- }
-
- upstream controller {
- server localhost:${services[controller]};
- }
- server {
- listen *:${services[controller-ssl]} ssl default_server;
- server_name controller;
- ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
- ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
- location / {
- proxy_pass http://controller;
- proxy_set_header Host \$http_host;
- proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto https;
- proxy_redirect off;
- }
- }
-
-upstream arvados-ws {
- server localhost:${services[websockets]};
-}
-server {
- listen *:${services[websockets-ssl]} ssl default_server;
- server_name websockets;
-
- proxy_connect_timeout 90s;
- proxy_read_timeout 300s;
-
- ssl on;
- ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
- ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
-
- location / {
- proxy_pass http://arvados-ws;
- proxy_set_header Upgrade \$http_upgrade;
- proxy_set_header Connection "upgrade";
- proxy_set_header Host \$http_host;
- proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
- }
-}
-
- upstream workbench2 {
- server localhost:${services[workbench2]};
- }
- server {
- listen *:${services[workbench2-ssl]} ssl default_server;
- server_name workbench2;
- ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
- ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
- location / {
- proxy_pass http://workbench2;
- proxy_set_header Host \$http_host;
- proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto https;
- proxy_redirect off;
- }
- location /sockjs-node {
- proxy_pass http://workbench2;
- proxy_set_header Upgrade \$http_upgrade;
- proxy_set_header Connection "upgrade";
- proxy_set_header Host \$http_host;
- proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
- }
- }
-
- upstream keep-web {
- server localhost:${services[keep-web]};
- }
- server {
- listen *:${services[keep-web-ssl]} ssl default_server;
- server_name keep-web;
- ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
- ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
- location / {
- proxy_pass http://keep-web;
- proxy_set_header Host \$http_host;
- proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto https;
- proxy_redirect off;
- }
- }
-
-}
-
-EOF
-
-exec nginx -c /var/lib/arvados/nginx.conf
import argparse
import gzip
+from io import open
import logging
import sys
help='Log more information (once for progress, twice for debug)')
+class UTF8Decode(object):
+ '''Wrap a file-like iterable to decode UTF-8 bytes into a strings
+ '''
+ def __init__(self, fh):
+ self.fh = fh
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return next(self.fh).decode('utf-8')
+
+ next = __next__
+
+ def close(self):
+ # mimic Gzip behavior and don't close underlying object
+ pass
+
+
class Command(object):
def __init__(self, args):
self.args = args
self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
elif self.args.log_file:
if self.args.log_file.endswith('.gz'):
- fh = gzip.open(self.args.log_file)
+ fh = UTF8Decode(gzip.open(self.args.log_file))
else:
- fh = open(self.args.log_file)
+ fh = open(self.args.log_file, mode = 'r', encoding = 'utf-8')
self.summer = summarizer.Summarizer(fh, **kwargs)
else:
self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
'data': self._collate_data(tasks, stat),
'options': {
'connectSeparatedPoints': True,
- 'labels': ['elapsed']+[uuid for uuid, _ in tasks.iteritems()],
+ 'labels': ['elapsed']+[uuid for uuid, _ in tasks.items()],
'title': '{}: {} {}'.format(label, stat[0], stat[1]),
},
}
def _collate_data(self, tasks, stat):
data = []
nulls = []
- for uuid, task in tasks.iteritems():
+ for uuid, task in tasks.items():
for pt in task.series[stat]:
data.append([pt[0].total_seconds()] + nulls + [pt[1]])
nulls.append(None)
#
# SPDX-License-Identifier: AGPL-3.0
-from __future__ import print_function
-
import arvados
import itertools
-import Queue
+import queue
import threading
from crunchstat_summary import logger
self._queue.put(self.EOF)
def __iter__(self):
- self._queue = Queue.Queue()
+ self._queue = queue.Queue()
self._thread = threading.Thread(target=self._get_all_pages)
self._thread.daemon = True
self._thread.start()
return self
- def next(self):
+ def __next__(self):
line = self._queue.get()
if line is self.EOF:
self._thread.join()
raise StopIteration
return line
+ next = __next__ # for Python 2
+
def __enter__(self):
return self
#
# SPDX-License-Identifier: AGPL-3.0
-from __future__ import print_function
-
import arvados
import collections
import crunchstat_summary.dygraphs
# number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
# that have amounts like 7.5 GiB according to the kernel.)
AVAILABLE_RAM_RATIO = 0.95
-
+MB=2**20
# Workaround datetime.datetime.strptime() thread-safety bug by calling
# it once before starting threads. https://bugs.python.org/issue7980
stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
if 'tx' in stats or 'rx' in stats:
stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
- for stat, val in stats.iteritems():
+ for stat, val in stats.items():
if group == 'interval':
if stat == 'seconds':
this_interval_s = val
self.job_tot = collections.defaultdict(
functools.partial(collections.defaultdict, int))
- for task_id, task_stat in self.task_stats.iteritems():
- for category, stat_last in task_stat.iteritems():
- for stat, val in stat_last.iteritems():
+ for task_id, task_stat in self.task_stats.items():
+ for category, stat_last in task_stat.items():
+ for stat, val in stat_last.items():
if stat in ['cpus', 'cache', 'swap', 'rss']:
# meaningless stats like 16 cpu cores x 5 tasks = 80
continue
def _text_report_gen(self):
yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
- for category, stat_max in sorted(self.stats_max.iteritems()):
- for stat, val in sorted(stat_max.iteritems()):
+ for category, stat_max in sorted(self.stats_max.items()):
+ for stat, val in sorted(stat_max.items()):
if stat.endswith('__rate'):
continue
max_rate = self._format(stat_max.get(stat+'__rate', '-'))
self.stats_max['cpu']['user+sys__rate'],
lambda x: x * 100),
('Overall CPU usage: {}%',
- self.job_tot['cpu']['user+sys'] /
+ float(self.job_tot['cpu']['user+sys']) /
self.job_tot['time']['elapsed']
if self.job_tot['time']['elapsed'] > 0 else 0,
lambda x: x * 100),
yield "# "+format_string.format(self._format(val))
def _recommend_gen(self):
+ # TODO recommend fixing job granularity if elapsed time is too short
return itertools.chain(
self._recommend_cpu(),
self._recommend_ram(),
constraint_key = self._map_runtime_constraint('vcpus')
cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
- if cpu_max_rate == float('-Inf'):
+ if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
logger.warning('%s: no CPU usage data', self.label)
return
+ # TODO Don't necessarily want to recommend on isolated max peak
+ # take average CPU usage into account as well or % time at max
used_cores = max(1, int(math.ceil(cpu_max_rate)))
asked_cores = self.existing_constraints.get(constraint_key)
- if asked_cores is None or used_cores < asked_cores:
+ if asked_cores is None:
+ asked_cores = 1
+ # TODO: This should be more nuanced in cases where max >> avg
+ if used_cores < asked_cores:
yield (
'#!! {} max CPU usage was {}% -- '
- 'try runtime_constraints "{}":{}'
+ 'try reducing runtime_constraints to "{}":{}'
).format(
self.label,
- int(math.ceil(cpu_max_rate*100)),
+ math.ceil(cpu_max_rate*100),
constraint_key,
int(used_cores))
+ # FIXME: This needs to be updated to account for current nodemanager algorithms
def _recommend_ram(self):
"""Recommend an economical RAM constraint for this job.
if used_bytes == float('-Inf'):
logger.warning('%s: no memory usage data', self.label)
return
- used_mib = math.ceil(float(used_bytes) / 1048576)
+ used_mib = math.ceil(float(used_bytes) / MB)
asked_mib = self.existing_constraints.get(constraint_key)
nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
- if asked_mib is None or (
- math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
+ if used_mib > 0 and (asked_mib is None or (
+ math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
yield (
'#!! {} max RSS was {} MiB -- '
- 'try runtime_constraints "{}":{}'
+ 'try reducing runtime_constraints to "{}":{}'
).format(
self.label,
int(used_mib),
constraint_key,
- int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
+ int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
def _recommend_keep_cache(self):
"""Recommend increasing keep cache if utilization < 80%"""
return
utilization = (float(self.job_tot['blkio:0:0']['read']) /
float(self.job_tot['net:keep0']['rx']))
- asked_mib = self.existing_constraints.get(constraint_key, 256)
+ # FIXME: the default on this get won't work correctly
+ asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
if utilization < 0.8:
yield (
'#!! {} Keep cache utilization was {:.2f}% -- '
- 'try runtime_constraints "{}":{} (or more)'
+ 'try doubling runtime_constraints to "{}":{} (or more)'
).format(
self.label,
utilization * 100.0,
constraint_key,
- asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
+ math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
def _format(self, val):
class JobSummarizer(ProcessSummarizer):
- runtime_constraint_mem_unit = 1048576
+ runtime_constraint_mem_unit = MB
map_runtime_constraint = {
'keep_cache_ram': 'keep_cache_mb_per_task',
'ram': 'min_ram_mb_per_node',
def run(self):
threads = []
- for child in self.children.itervalues():
+ for child in self.children.values():
self.throttle.acquire()
t = threading.Thread(target=self.run_and_release, args=(child.run, ))
t.daemon = True
def text_report(self):
txt = ''
d = self._descendants()
- for child in d.itervalues():
+ for child in d.values():
if len(d) > 1:
txt += '### Summary for {} ({})\n'.format(
child.label, child.process['uuid'])
MultiSummarizers) are omitted.
"""
d = collections.OrderedDict()
- for key, child in self.children.iteritems():
+ for key, child in self.children.items():
if isinstance(child, Summarizer):
d[key] = child
if isinstance(child, MultiSummarizer):
return d
def html_report(self):
- return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html()
+ return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
class JobTreeSummarizer(MultiSummarizer):
preloaded = {}
for j in arv.jobs().index(
limit=len(job['components']),
- filters=[['uuid','in',job['components'].values()]]).execute()['items']:
+ filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
preloaded[j['uuid']] = j
for cname in sorted(job['components'].keys()):
child_uuid = job['components'][cname]
class PipelineSummarizer(MultiSummarizer):
def __init__(self, instance, **kwargs):
children = collections.OrderedDict()
- for cname, component in instance['components'].iteritems():
+ for cname, component in instance['components'].items():
if 'job' not in component:
logger.warning(
"%s: skipping component with no job assigned", cname)
cr['name'] = cr.get('name') or cr['uuid']
todo.append(cr)
sorted_children = collections.OrderedDict()
- for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
+ for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
sorted_children[uuid] = children[uuid]
super(ContainerTreeSummarizer, self).__init__(
children=sorted_children,
#
# SPDX-License-Identifier: AGPL-3.0
-import cgi
+try:
+ from html import escape
+except ImportError:
+ from cgi import escape
+
import json
import pkg_resources
<script type="text/javascript">{}</script>
{}
</head><body></body></html>
- '''.format(cgi.escape(self.label),
+ '''.format(escape(self.label),
self.JSLIB, self.js(), self.headHTML())
def js(self):
return 'var chartdata = {};\n{}'.format(
json.dumps(self.sections()),
- '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa) for jsa in self.JSASSETS]))
+ '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa).decode('utf-8') for jsa in self.JSASSETS]))
def sections(self):
return [
# Number of tasks: 1
# Max CPU time spent by a single task: 0s
# Max CPU usage in a single interval: 0%
-# Overall CPU usage: 0%
+# Overall CPU usage: 0.00%
# Max memory used by a single task: 0.00GB
# Max network traffic in a single task: 0.00GB
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! container max CPU usage was 0% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 0 MiB -- try runtime_constraints "ram":0
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
+#!! container max RSS was 67 MiB -- try reducing runtime_constraints to "ram":1020054732
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
+#!! container max RSS was 67 MiB -- try reducing runtime_constraints to "ram":1020054732
# Max network speed in a single interval: 42.58MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-jq0ekny1xou3zoh max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-zvb2ocfycpomrup max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-v831jm2uq0g2g9x max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
import difflib
import glob
import gzip
+from io import open
import mock
import os
+import sys
import unittest
+from crunchstat_summary.command import UTF8Decode
+
TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
class ReportDiff(unittest.TestCase):
def diff_known_report(self, logfile, cmd):
expectfile = logfile+'.report'
- expect = open(expectfile).readlines()
+ with open(expectfile, encoding='utf-8') as f:
+ expect = f.readlines()
self.diff_report(cmd, expect, expectfile=expectfile)
- def diff_report(self, cmd, expect, expectfile=None):
+ def diff_report(self, cmd, expect, expectfile='(expected)'):
got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
expect, got, fromfile=expectfile, tofile="(generated)")))
['--format=html', '--log-file', logfile])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
- self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
+ if sys.version_info >= (3,2):
+ self.assertRegex(cmd.report(), r'(?is)<html>.*</html>\s*$')
+ else:
+ self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
class SummarizeEdgeCases(unittest.TestCase):
def test_error_messages(self):
- logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'))
+ logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'), encoding='utf-8')
s = crunchstat_summary.summarizer.Summarizer(logfile)
s.run()
'container.json', 'crunchstat.txt', 'arv-mount.txt']
def _open(n):
if n == "crunchstat.txt":
- return gzip.open(self.logfile)
+ return UTF8Decode(gzip.open(self.logfile))
elif n == "arv-mount.txt":
- return gzip.open(self.arvmountlog)
+ return UTF8Decode(gzip.open(self.arvmountlog))
mock_cr().open.side_effect = _open
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_request['uuid']])
def test_job_report(self, mock_api, mock_cr):
mock_api().jobs().get().execute.return_value = self.fake_job
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.return_value = gzip.open(self.logfile)
+ mock_cr().open.return_value = UTF8Decode(gzip.open(self.logfile))
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_job_uuid])
cmd = crunchstat_summary.command.Command(args)
mock_api().pipeline_instances().get().execute. \
return_value = self.fake_instance
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--pipeline-instance', self.fake_instance['uuid']])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
- job_report = [
- line for line in open(logfile+'.report').readlines()
- if not line.startswith('#!! ')]
+ with open(logfile+'.report', encoding='utf-8') as f:
+ job_report = [line for line in f if not line.startswith('#!! ')]
expect = (
['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
job_report + ['\n'] +
mock_api().jobs().index().execute.return_value = self.fake_jobs_index
mock_api().jobs().get().execute.return_value = self.fake_job
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_job['uuid']])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
- job_report = [
- line for line in open(logfile+'.report').readlines()
- if not line.startswith('#!! ')]
+ with open(logfile+'.report', encoding='utf-8') as f:
+ job_report = [line for line in f if not line.startswith('#!! ')]
expect = (
['### Summary for zzzzz-8i9sb-i3e77t9z5y8j9cc (partial) (zzzzz-8i9sb-i3e77t9z5y8j9cc)\n',
'(no report generated)\n',