fi
}
-title () {
- txt="********** $1 **********"
- printf "\n%*s%s\n\n" $((($COLUMNS-${#txt})/2)) "" "$txt"
+title() {
+ printf '%s %s\n' "=======" "$1"
}
checkexit() {
if [[ "$1" != "0" ]]; then
- title "!!!!!! $2 FAILED !!!!!!"
+ title "$2 -- FAILED"
failures+=("$2 (`timer`)")
else
successes+=("$2 (`timer`)")
if [[ ${#failures[@]} == 0 ]]
then
- echo "All test suites passed."
+ if [[ ${#successes[@]} != 0 ]]; then
+ echo "All test suites passed."
+ fi
else
echo "Failures (${#failures[@]}):"
for x in "${failures[@]}"
Options:
--skip FOO Do not test the FOO component.
+--skip sanity Skip initial dev environment sanity checks.
+--skip install Do not run any install steps. Just run tests.
+ You should provide GOPATH, GEMHOME, and VENVDIR options
+ from a previous invocation if you use this option.
--only FOO Do not test anything except the FOO component.
--temp DIR Install components and dependencies under DIR instead of
making a new temporary directory. Implies --leave-temp.
subsequent invocations.
--repeat N Repeat each install/test step until it succeeds N times.
--retry Prompt to retry if an install or test suite fails.
---skip-install Do not run any install steps. Just run tests.
- You should provide GOPATH, GEMHOME, and VENVDIR options
- from a previous invocation if you use this option.
--only-install Run specific install step
--short Skip (or scale down) some slow tests.
+--interactive Set up, then prompt for test/install steps to perform.
WORKSPACE=path Arvados source tree to test.
CONFIGSRC=path Dir with api server config files to copy into source tree.
(If none given, leave config files alone in source tree.)
envvar=value Set \$envvar to value. Primarily useful for WORKSPACE,
*_test, and other examples shown above.
-Assuming --skip-install is not given, all components are installed
+Assuming "--skip install" is not given, all components are installed
into \$GOPATH, \$VENDIR, and \$GEMHOME before running any tests. Many
test suites depend on other components being installed, and installing
everything tends to be quicker than debugging dependencies.
exit_cleanly() {
trap - INT
- create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+ if which create-plot-data-from-log.sh >/dev/null; then
+ create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+ fi
rotate_logfile "$WORKSPACE/apps/workbench/log/" "test.log"
stop_services
rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
}
sanity_checks() {
+ [[ -n "${skip[sanity]}" ]] && return 0
( [[ -n "$WORKSPACE" ]] && [[ -d "$WORKSPACE/services" ]] ) \
|| fatal "WORKSPACE environment variable not set to a source directory (see: $0 --help)"
echo Checking dependencies:
--short)
short=1
;;
+ --interactive)
+ interactive=1
+ ;;
--skip-install)
- only_install=nothing
+ skip[install]=1
;;
--only-install)
only_install="$1"; shift
fi
start_services() {
+ if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
+ return 0
+ fi
+ . "$VENVDIR/bin/activate"
echo 'Starting API, keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
if [[ ! -d "$WORKSPACE/services/api/log" ]]; then
mkdir -p "$WORKSPACE/services/api/log"
if [[ -f "$WORKSPACE/tmp/api.pid" && ! -s "$WORKSPACE/tmp/api.pid" ]]; then
rm -f "$WORKSPACE/tmp/api.pid"
fi
+ all_services_stopped=
+ fail=0
cd "$WORKSPACE" \
- && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo fail=1) \
+ && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo "fail=1; false") \
&& export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
&& export ARVADOS_TEST_API_INSTALLED="$$" \
&& python sdk/python/tests/run_test_server.py start_controller \
&& python sdk/python/tests/run_test_server.py start_keep-web \
&& python sdk/python/tests/run_test_server.py start_arv-git-httpd \
&& python sdk/python/tests/run_test_server.py start_ws \
- && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo fail=1) \
- && (env | egrep ^ARVADOS)
- if [[ -n "$fail" ]]; then
- return 1
+ && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo "fail=1; false") \
+ && (env | egrep ^ARVADOS) \
+ || fail=1
+ deactivate
+ if [[ $fail != 0 ]]; then
+ unset ARVADOS_TEST_API_HOST
fi
+ return $fail
}
stop_services() {
- if [[ -z "$ARVADOS_TEST_API_HOST" ]]; then
+ if [[ -n "$all_services_stopped" ]]; then
return
fi
unset ARVADOS_TEST_API_HOST
+ . "$VENVDIR/bin/activate" || return
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py stop_nginx \
&& python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
&& python sdk/python/tests/run_test_server.py stop_keep-web \
&& python sdk/python/tests/run_test_server.py stop_keep_proxy \
&& python sdk/python/tests/run_test_server.py stop_controller \
- && python sdk/python/tests/run_test_server.py stop
+ && python sdk/python/tests/run_test_server.py stop \
+ && all_services_stopped=1
+ deactivate
}
interrupt() {
}
trap interrupt INT
-sanity_checks
-
-echo "WORKSPACE=$WORKSPACE"
-
-if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
- # Jenkins expects us to use this by default.
- CONFIGSRC="$HOME/arvados-api-server"
-fi
-
-# Clean up .pyc files that may exist in the workspace
-cd "$WORKSPACE"
-find -name '*.pyc' -delete
-
-if [[ -z "$temp" ]]; then
- temp="$(mktemp -d)"
-fi
-
-# Set up temporary install dirs (unless existing dirs were supplied)
-for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
-do
- if [[ -z "${!tmpdir}" ]]; then
- eval "$tmpdir"="$temp/$tmpdir"
- fi
- if ! [[ -d "${!tmpdir}" ]]; then
- mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
- fi
-done
-
-rm -vf "${WORKSPACE}/tmp/*.log"
-
setup_ruby_environment() {
if [[ -s "$HOME/.rvm/scripts/rvm" ]] ; then
source "$HOME/.rvm/scripts/rvm"
"$venvdest/bin/pip" install --no-cache-dir 'mock>=1.0' 'pbr<1.7.0'
}
-export PERLINSTALLBASE
-export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
+initialize() {
+ sanity_checks
-export R_LIBS
+ echo "WORKSPACE=$WORKSPACE"
-export GOPATH
-(
- set -e
- mkdir -p "$GOPATH/src/git.curoverse.com"
- if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
- for d in \
- "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
- "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
- "$GOPATH/src/git.curoverse.com/arvados.git"; do
- [[ -d "$d" ]] && rmdir "$d"
- done
+ if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
+ # Jenkins expects us to use this by default.
+ CONFIGSRC="$HOME/arvados-api-server"
fi
- for d in \
- "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
- "$GOPATH/src/git.curoverse.com/arvados.git"; do
- [[ -h "$d" ]] && rm "$d"
- done
- ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
- go get -v github.com/kardianos/govendor
- cd "$GOPATH/src/git.curoverse.com/arvados.git"
- if [[ -n "$short" ]]; then
- go get -v -d ...
- "$GOPATH/bin/govendor" sync
- else
- # Remove cached source dirs in workdir. Otherwise, they will
- # not qualify as +missing or +external below, and we won't be
- # able to detect that they're missing from vendor/vendor.json.
- rm -rf vendor/*/
- go get -v -d ...
- "$GOPATH/bin/govendor" sync
- [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
- || fatal "vendor/vendor.json has unused or missing dependencies -- try:
-(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
+ # Clean up .pyc files that may exist in the workspace
+ cd "$WORKSPACE"
+ find -name '*.pyc' -delete
-";
+ if [[ -z "$temp" ]]; then
+ temp="$(mktemp -d)"
fi
-) || fatal "Go setup failed"
-setup_virtualenv "$VENVDIR" --python python2.7
-. "$VENVDIR/bin/activate"
+ # Set up temporary install dirs (unless existing dirs were supplied)
+ for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
+ do
+ if [[ -z "${!tmpdir}" ]]; then
+ eval "$tmpdir"="$temp/$tmpdir"
+ fi
+ if ! [[ -d "${!tmpdir}" ]]; then
+ mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
+ fi
+ done
-# Needed for run_test_server.py which is used by certain (non-Python) tests.
-pip install --no-cache-dir PyYAML \
- || fatal "pip install PyYAML failed"
+ rm -vf "${WORKSPACE}/tmp/*.log"
-# Preinstall libcloud if using a fork; otherwise nodemanager "pip
-# install" won't pick it up by default.
-if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
- pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
- || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
- || fatal "pip install apache-libcloud failed"
-fi
+ export PERLINSTALLBASE
+ export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
-# Deactivate Python 2 virtualenv
-deactivate
+ export R_LIBS
-declare -a pythonstuff
-pythonstuff=(
- sdk/pam
- sdk/python
- sdk/python:py3
- sdk/cwl
- sdk/cwl:py3
- services/dockercleaner:py3
- services/fuse
- services/nodemanager
- tools/crunchstat-summary
- )
+ export GOPATH
-# If Python 3 is available, set up its virtualenv in $VENV3DIR.
-# Otherwise, skip dependent tests.
-PYTHON3=$(which python3)
-if [[ ${?} = 0 ]]; then
- setup_virtualenv "$VENV3DIR" --python python3
-else
- PYTHON3=
- cat >&2 <<EOF
+ # Jenkins config requires that glob tmp/*.log match something. Ensure
+ # that happens even if we don't end up running services that set up
+ # logging.
+ mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
+ touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
-Warning: python3 could not be found. Python 3 tests will be skipped.
+ unset http_proxy https_proxy no_proxy
-EOF
-fi
-# Reactivate Python 2 virtualenv
-. "$VENVDIR/bin/activate"
+ # Note: this must be the last time we change PATH, otherwise rvm will
+ # whine a lot.
+ setup_ruby_environment
+
+ echo "PATH is $PATH"
+}
+
+install_env() {
+ (
+ set -e
+ mkdir -p "$GOPATH/src/git.curoverse.com"
+ if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
+ for d in \
+ "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
+ "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
+ "$GOPATH/src/git.curoverse.com/arvados.git"; do
+ [[ -d "$d" ]] && rmdir "$d"
+ done
+ fi
+ for d in \
+ "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
+ "$GOPATH/src/git.curoverse.com/arvados.git"; do
+ [[ -h "$d" ]] && rm "$d"
+ done
+ ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+ go get -v github.com/kardianos/govendor
+ cd "$GOPATH/src/git.curoverse.com/arvados.git"
+ if [[ -n "$short" ]]; then
+ go get -v -d ...
+ "$GOPATH/bin/govendor" sync
+ else
+ # Remove cached source dirs in workdir. Otherwise, they will
+ # not qualify as +missing or +external below, and we won't be
+ # able to detect that they're missing from vendor/vendor.json.
+ rm -rf vendor/*/
+ go get -v -d ...
+ "$GOPATH/bin/govendor" sync
+ [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
+ || fatal "vendor/vendor.json has unused or missing dependencies -- try:
+
+(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
-# Note: this must be the last time we change PATH, otherwise rvm will
-# whine a lot.
-setup_ruby_environment
+";
+ fi
+ ) || fatal "Go setup failed"
-echo "PATH is $PATH"
+ setup_virtualenv "$VENVDIR" --python python2.7
+ . "$VENVDIR/bin/activate"
-if ! which bundler >/dev/null
-then
- gem install --user-install bundler || fatal 'Could not install bundler'
-fi
+ # Needed for run_test_server.py which is used by certain (non-Python) tests.
+ pip install --no-cache-dir PyYAML \
+ || fatal "pip install PyYAML failed"
+
+ # Preinstall libcloud if using a fork; otherwise nodemanager "pip
+ # install" won't pick it up by default.
+ if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
+ pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
+ || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
+ || fatal "pip install apache-libcloud failed"
+ fi
-# Jenkins config requires that glob tmp/*.log match something. Ensure
-# that happens even if we don't end up running services that set up
-# logging.
-mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
-touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
+ # Deactivate Python 2 virtualenv
+ deactivate
+
+ # If Python 3 is available, set up its virtualenv in $VENV3DIR.
+ # Otherwise, skip dependent tests.
+ PYTHON3=$(which python3)
+ if [[ ${?} = 0 ]]; then
+ setup_virtualenv "$VENV3DIR" --python python3
+ else
+ PYTHON3=
+ cat >&2 <<EOF
+
+Warning: python3 could not be found. Python 3 tests will be skipped.
+
+EOF
+ fi
+
+ if ! which bundler >/dev/null
+ then
+ gem install --user-install bundler || fatal 'Could not install bundler'
+ fi
+}
retry() {
remain="${repeat}"
if ${@}; then
if [[ "$remain" -gt 1 ]]; then
remain=$((${remain}-1))
- title "Repeating ${remain} more times"
+ title "(repeating ${remain} more times)"
else
break
fi
suite="${1}"
;;
esac
- if [[ -z "${skip[$suite]}" && -z "${skip[$1]}" && \
- (${#only[@]} -eq 0 || ${only[$suite]} -eq 1 || \
- ${only[$1]} -eq 1) ||
- ${only[$2]} -eq 1 ]]; then
- retry do_test_once ${@}
- else
- title "Skipping ${1} tests"
+ if [[ -n "${skip[$suite]}" || \
+ -n "${skip[$1]}" || \
+ (${#only[@]} -ne 0 && ${only[$suite]} -eq 0 && ${only[$1]} -eq 0) ]]; then
+ return 0
fi
+ case "${1}" in
+ services/api)
+ stop_services
+ ;;
+ doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
+ # don't care whether services are running
+ ;;
+ *)
+ if ! start_services; then
+ title "test $1 -- failed to start services"
+ return 1
+ fi
+ ;;
+ esac
+ retry do_test_once ${@}
}
do_test_once() {
unset result
- title "Running $1 tests"
+ title "test $1"
timer_reset
- if [[ "$2" == "go" ]]
+
+ if which deactivate >/dev/null; then deactivate; fi
+ if ! . "$VENVDIR/bin/activate"
+ then
+ result=1
+ elif [[ "$2" == "go" ]]
then
covername="coverage-$(echo "$1" | sed -e 's/\//_/g')"
coverflags=("-covermode=count" "-coverprofile=$WORKSPACE/tmp/.$covername.tmp")
fi
result=${result:-$?}
checkexit $result "$1 tests"
- title "End of $1 tests (`timer`)"
+ title "test $1 -- `timer`"
return $result
}
do_install() {
- skipit=false
-
- if [[ -z "${only_install}" || "${only_install}" == "${1}" || "${only_install}" == "${2}" ]]; then
- retry do_install_once ${@}
- else
- skipit=true
- fi
-
- if [[ "$skipit" = true ]]; then
- title "Skipping $1 install"
- fi
+ if [[ -n "${skip[install]}" || ( -n "${only_install}" && "${only_install}" != "${1}" && "${only_install}" != "${2}" ) ]]; then
+ return 0
+ fi
+ retry do_install_once ${@}
}
do_install_once() {
- title "Running $1 install"
+ title "install $1"
timer_reset
- if [[ "$2" == "go" ]]
+
+ if which deactivate >/dev/null; then deactivate; fi
+ if [[ "$1" != "env" ]] && ! . "$VENVDIR/bin/activate"; then
+ result=1
+ elif [[ "$2" == "go" ]]
then
go get -ldflags "-X main.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1"
elif [[ "$2" == "pip" ]]
else
"install_$1"
fi
- result=$?
+ result=${result:-$?}
checkexit $result "$1 install"
- title "End of $1 install (`timer`)"
+ title "install $1 -- `timer`"
return $result
}
&& bundle_install_trylocal \
&& rm -rf .site
}
-do_install doc
install_gem() {
gemname=$1
&& with_test_gemset gem install --no-ri --no-rdoc $(ls -t "$gemname"-*.gem|head -n1)
}
-install_ruby_sdk() {
+install_sdk/ruby() {
install_gem arvados sdk/ruby
}
-do_install sdk/ruby ruby_sdk
-install_R_sdk() {
+install_sdk/R() {
if [[ "$NEED_SDK_R" = true ]]; then
cd "$WORKSPACE/sdk/R" \
&& Rscript --vanilla install_deps.R
fi
}
-do_install sdk/R R_sdk
-install_perl_sdk() {
+install_sdk/perl() {
cd "$WORKSPACE/sdk/perl" \
&& perl Makefile.PL INSTALL_BASE="$PERLINSTALLBASE" \
&& make install INSTALLDIRS=perl
}
-do_install sdk/perl perl_sdk
-install_cli() {
+install_sdk/cli() {
install_gem arvados-cli sdk/cli
}
-do_install sdk/cli cli
-install_login-sync() {
+install_services/login-sync() {
install_gem arvados-login-sync services/login-sync
}
-do_install services/login-sync login-sync
-
-# Install the Python SDK early. Various other test suites (like
-# keepproxy) bring up run_test_server.py, which imports the arvados
-# module. We can't actually *test* the Python SDK yet though, because
-# its own test suite brings up some of those other programs (like
-# keepproxy).
-for p in "${pythonstuff[@]}"
-do
- dir=${p%:py3}
- if [[ ${dir} = ${p} ]]; then
- if [[ -z ${skip[python2]} ]]; then
- do_install ${dir} pip
- fi
- elif [[ -n ${PYTHON3} ]]; then
- if [[ -z ${skip[python3]} ]]; then
- do_install ${dir} pip "$VENV3DIR/bin/"
- fi
- fi
-done
-install_apiserver() {
+install_services/api() {
cd "$WORKSPACE/services/api" \
&& RAILS_ENV=test bundle_install_trylocal
&& RAILS_ENV=test bundle exec rake db:setup \
&& RAILS_ENV=test bundle exec rake db:fixtures:load
}
-do_install services/api apiserver
+
+declare -a pythonstuff
+pythonstuff=(
+ sdk/pam
+ sdk/python
+ sdk/python:py3
+ sdk/cwl
+ sdk/cwl:py3
+ services/dockercleaner:py3
+ services/fuse
+ services/nodemanager
+ tools/crunchstat-summary
+)
declare -a gostuff
gostuff=(
tools/keep-rsync
tools/sync-groups
)
-for g in "${gostuff[@]}"
-do
- do_install "$g" go
-done
-install_workbench() {
+install_apps/workbench() {
cd "$WORKSPACE/apps/workbench" \
&& mkdir -p tmp/cache \
&& RAILS_ENV=test bundle_install_trylocal \
&& RAILS_ENV=test RAILS_GROUPS=assets bundle exec rake npm:install
}
-do_install apps/workbench workbench
-unset http_proxy https_proxy no_proxy
-
-test_doclinkchecker() {
+test_doc() {
(
set -e
cd "$WORKSPACE/doc"
PYTHONPATH=$WORKSPACE/sdk/python/ bundle exec rake linkchecker baseurl=file://$WORKSPACE/doc/.site/ arvados_workbench_host=https://workbench.$ARVADOS_API_HOST arvados_api_host=$ARVADOS_API_HOST
)
}
-do_test doc doclinkchecker
-
-stop_services
-test_apiserver() {
+test_services/api() {
rm -f "$WORKSPACE/services/api/git-commit.version"
cd "$WORKSPACE/services/api" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[services/api]}
}
-do_test services/api apiserver
-
-# Shortcut for when we're only running apiserver tests. This saves a bit of time,
-# because we don't need to start up the api server for subsequent tests.
-if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
- rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
- exit_cleanly
-fi
-
-start_services || { stop_services; fatal "start_services"; }
-test_ruby_sdk() {
+test_sdk/ruby() {
cd "$WORKSPACE/sdk/ruby" \
&& bundle exec rake test TESTOPTS=-v ${testargs[sdk/ruby]}
}
-do_test sdk/ruby ruby_sdk
-test_R_sdk() {
+test_sdk/R() {
if [[ "$NEED_SDK_R" = true ]]; then
cd "$WORKSPACE/sdk/R" \
&& Rscript --vanilla run_test.R
fi
}
-do_test sdk/R R_sdk
-
-test_cli() {
+test_sdk/cli() {
cd "$WORKSPACE/sdk/cli" \
&& mkdir -p /tmp/keep \
&& KEEP_LOCAL_STORE=/tmp/keep bundle exec rake test TESTOPTS=-v ${testargs[sdk/cli]}
}
-do_test sdk/cli cli
-test_login-sync() {
+test_services/login-sync() {
cd "$WORKSPACE/services/login-sync" \
&& bundle exec rake test TESTOPTS=-v ${testargs[services/login-sync]}
}
-do_test services/login-sync login-sync
-test_nodemanager_integration() {
+test_services/nodemanager_integration() {
cd "$WORKSPACE/services/nodemanager" \
&& tests/integration_test.py ${testargs[services/nodemanager_integration]}
}
-do_test services/nodemanager_integration nodemanager_integration
-for p in "${pythonstuff[@]}"
-do
- dir=${p%:py3}
- if [[ ${dir} = ${p} ]]; then
- if [[ -z ${skip[python2]} ]]; then
- do_test ${dir} pip
- fi
- elif [[ -n ${PYTHON3} ]]; then
- if [[ -z ${skip[python3]} ]]; then
- do_test ${dir} pip "$VENV3DIR/bin/"
- fi
- fi
-done
-
-for g in "${gostuff[@]}"
-do
- do_test "$g" go
-done
-
-test_workbench_units() {
+test_apps/workbench_units() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:units TESTOPTS=-v ${testargs[apps/workbench]}
}
-do_test apps/workbench_units workbench_units
-test_workbench_functionals() {
+test_apps/workbench_functionals() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:functionals TESTOPTS=-v ${testargs[apps/workbench]}
}
-do_test apps/workbench_functionals workbench_functionals
-test_workbench_integration() {
+test_apps/workbench_integration() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:integration TESTOPTS=-v ${testargs[apps/workbench]}
}
-do_test apps/workbench_integration workbench_integration
-
-test_workbench_benchmark() {
+test_apps/workbench_benchmark() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]}
}
-do_test apps/workbench_benchmark workbench_benchmark
-test_workbench_profile() {
+test_apps/workbench_profile() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:profile ${testargs[apps/workbench_profile]}
}
-do_test apps/workbench_profile workbench_profile
+install_deps() {
+ # Install parts needed by test suites
+ do_install env
+ do_install cmd/arvados-server go
+ do_install sdk/cli
+ do_install sdk/perl
+ do_install sdk/python pip
+ do_install sdk/ruby
+ do_install services/api
+ do_install services/arv-git-httpd go
+ do_install services/keepproxy go
+ do_install services/keepstore go
+ do_install services/keep-web go
+ do_install services/ws go
+}
+
+install_all() {
+ do_install env
+ do_install doc
+ do_install sdk/ruby
+ do_install sdk/R
+ do_install sdk/perl
+ do_install sdk/cli
+ do_install services/login-sync
+ for p in "${pythonstuff[@]}"
+ do
+ dir=${p%:py3}
+ if [[ ${dir} = ${p} ]]; then
+ if [[ -z ${skip[python2]} ]]; then
+ do_install ${dir} pip
+ fi
+ elif [[ -n ${PYTHON3} ]]; then
+ if [[ -z ${skip[python3]} ]]; then
+ do_install ${dir} pip "$VENV3DIR/bin/"
+ fi
+ fi
+ done
+ do_install services/api
+ for g in "${gostuff[@]}"
+ do
+ do_install "$g" go
+ done
+ do_install apps/workbench
+}
+
+test_all() {
+ stop_services
+ do_test services/api
+
+ # Shortcut for when we're only running apiserver tests. This saves a bit of time,
+ # because we don't need to start up the api server for subsequent tests.
+ if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
+ rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
+ exit_cleanly
+ fi
+
+ do_test doc
+ do_test sdk/ruby
+ do_test sdk/R
+ do_test sdk/cli
+ do_test services/login-sync
+ do_test services/nodemanager_integration
+ for p in "${pythonstuff[@]}"
+ do
+ dir=${p%:py3}
+ if [[ ${dir} = ${p} ]]; then
+ if [[ -z ${skip[python2]} ]]; then
+ do_test ${dir} pip
+ fi
+ elif [[ -n ${PYTHON3} ]]; then
+ if [[ -z ${skip[python3]} ]]; then
+ do_test ${dir} pip "$VENV3DIR/bin/"
+ fi
+ fi
+ done
+
+ for g in "${gostuff[@]}"
+ do
+ do_test "$g" go
+ done
+ do_test apps/workbench_units
+ do_test apps/workbench_functionals
+ do_test apps/workbench_integration
+ do_test apps/workbench_benchmark
+ do_test apps/workbench_profile
+}
+
+help_interactive() {
+ echo "== Interactive commands:"
+ echo "TARGET (short for 'test DIR')"
+ echo "test TARGET"
+ echo "test TARGET:py3 (test with python3)"
+ echo "test TARGET -check.vv (pass arguments to test)"
+ echo "install TARGET"
+ echo "install env (go/python libs)"
+ echo "install deps (go/python libs + arvados components needed for integration tests)"
+ echo "reset (...services used by integration tests)"
+ echo "exit"
+ echo "== Test targets:"
+ echo "${!testfuncargs[@]}" | tr ' ' '\n' | sort | column
+}
+
+initialize
+
+declare -A testfuncargs=()
+for g in "${gostuff[@]}"; do
+ testfuncargs[$g]="$g go"
+done
+for p in "${pythonstuff[@]}"; do
+ dir=${p%:py3}
+ if [[ ${dir} = ${p} ]]; then
+ testfuncargs[$p]="$dir pip $VENVDIR/bin/"
+ else
+ testfuncargs[$p]="$dir pip $VENV3DIR/bin/"
+ fi
+done
+
+if [[ -z ${interactive} ]]; then
+ install_all
+ test_all
+else
+ skip=()
+ only=()
+ only_install=()
+ if [[ -e "$VENVDIR/bin/activate" ]]; then stop_services; fi
+ setnextcmd() {
+ if [[ "$nextcmd" != "install deps" ]]; then
+ :
+ elif [[ -e "$VENVDIR/bin/activate" ]]; then
+ nextcmd="test lib/cmd"
+ else
+ nextcmd="install deps"
+ fi
+ }
+ echo
+ help_interactive
+ nextcmd="install deps"
+ setnextcmd
+ while read -p 'What next? ' -e -i "${nextcmd}" nextcmd; do
+ read verb target opts <<<"${nextcmd}"
+ case "${verb}" in
+ "" | "help")
+ help_interactive
+ ;;
+ "exit" | "quit")
+ exit_cleanly
+ ;;
+ "reset")
+ stop_services
+ ;;
+ *)
+ target="${target%/}"
+ testargs["$target"]="${opts}"
+ case "$target" in
+ all | deps)
+ ${verb}_${target}
+ ;;
+ *)
+ tt="${testfuncargs[${target}]}"
+ tt="${tt:-$target}"
+ do_$verb $tt
+ ;;
+ esac
+ ;;
+ esac
+ if [[ ${#successes[@]} -gt 0 || ${#failures[@]} -gt 0 ]]; then
+ report_outcomes
+ successes=()
+ failures=()
+ fi
+ cd "$WORKSPACE"
+ setnextcmd
+ done
+ echo
+fi
exit_cleanly
ProbeInterval: arvados.Duration(5 * time.Millisecond),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
MaxProbesPerSecond: 1000,
+ TimeoutSignal: arvados.Duration(3 * time.Millisecond),
+ TimeoutTERM: arvados.Duration(20 * time.Millisecond),
},
InstanceTypes: arvados.InstanceTypeMap{
test.InstanceType(1).Name: test.InstanceType(1),
for _, ctr := range queue.Containers {
waiting[ctr.UUID] = struct{}{}
}
- executeContainer := func(ctr arvados.Container) int {
+ finishContainer := func(ctr arvados.Container) {
mtx.Lock()
defer mtx.Unlock()
if _, ok := waiting[ctr.UUID]; !ok {
- c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
- return 1
+ c.Errorf("container completed twice: %s", ctr.UUID)
+ return
}
delete(waiting, ctr.UUID)
if len(waiting) == 0 {
close(done)
}
+ }
+ executeContainer := func(ctr arvados.Container) int {
+ finishContainer(ctr)
return int(rand.Uint32() & 0x3)
}
n := 0
stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
stubvm.ExecuteContainer = executeContainer
+ stubvm.CrashRunningContainer = finishContainer
switch n % 7 {
case 0:
stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
c.Check(ok, check.Equals, true)
<-ch
- sr = getInstances()
+ for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); {
+ sr = getInstances()
+ if len(sr.Items) > 0 {
+ break
+ }
+ time.Sleep(time.Millisecond)
+ }
c.Assert(len(sr.Items), check.Equals, 1)
c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
Create(arvados.InstanceType) bool
Shutdown(arvados.InstanceType) bool
StartContainer(arvados.InstanceType, arvados.Container) bool
- KillContainer(uuid string)
+ KillContainer(uuid, reason string)
Subscribe() <-chan struct{}
Unsubscribe(<-chan struct{})
}
p.unalloc[it]++
return true
}
-func (p *stubPool) KillContainer(uuid string) {
+func (p *stubPool) KillContainer(uuid, reason string) {
p.Lock()
defer p.Unlock()
delete(p.running, uuid)
if !running {
go sch.cancel(uuid, "not running on any worker")
} else if !exited.IsZero() && qUpdated.After(exited) {
- go sch.cancel(uuid, "state=\"Running\" after crunch-run exited")
+ go sch.cancel(uuid, "state=Running after crunch-run exited")
} else if ent.Container.Priority == 0 {
go sch.kill(uuid, "priority=0")
}
// of kill() will be to make the
// worker available for the next
// container.
- go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
} else {
sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
// a network outage and is still
// preparing to run a container that
// has already been unlocked/requeued.
- go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
}
case arvados.ContainerStateLocked:
if running && !exited.IsZero() && qUpdated.After(exited) {
}
func (sch *Scheduler) kill(uuid string, reason string) {
- logger := sch.logger.WithField("ContainerUUID", uuid)
- logger.Debugf("killing crunch-run process because %s", reason)
- sch.pool.KillContainer(uuid)
+ sch.pool.KillContainer(uuid, reason)
}
func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
//
// The resulting changes are not exposed through Get() or Entries()
// until the next call to Update().
-func (q *Queue) Notify(upd arvados.Container) {
+//
+// Return value is true unless the update is rejected (invalid state
+// transition).
+func (q *Queue) Notify(upd arvados.Container) bool {
q.mtx.Lock()
defer q.mtx.Unlock()
for i, ctr := range q.Containers {
if ctr.UUID == upd.UUID {
- q.Containers[i] = upd
- return
+ if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+ q.Containers[i] = upd
+ return true
+ }
+ return false
}
}
q.Containers = append(q.Containers, upd)
+ return true
}
tags: copyTags(tags),
providerType: it.ProviderType,
initCommand: cmd,
+ running: map[string]int64{},
+ killing: map[string]bool{},
}
svm.SSHService = SSHService{
HostKey: sis.driver.HostKey,
// running (and might change IP addresses, shut down, etc.) without
// updating any stubInstances that have been returned to callers.
type StubVM struct {
- Boot time.Time
- Broken time.Time
- CrunchRunMissing bool
- CrunchRunCrashRate float64
- CrunchRunDetachDelay time.Duration
- ExecuteContainer func(arvados.Container) int
+ Boot time.Time
+ Broken time.Time
+ CrunchRunMissing bool
+ CrunchRunCrashRate float64
+ CrunchRunDetachDelay time.Duration
+ ExecuteContainer func(arvados.Container) int
+ CrashRunningContainer func(arvados.Container)
sis *StubInstanceSet
id cloud.InstanceID
initCommand cloud.InitCommand
providerType string
SSHService SSHService
- running map[string]bool
+ running map[string]int64
+ killing map[string]bool
+ lastPID int64
sync.Mutex
}
}
}
svm.Lock()
- if svm.running == nil {
- svm.running = map[string]bool{}
- }
- svm.running[uuid] = true
+ svm.lastPID++
+ pid := svm.lastPID
+ svm.running[uuid] = pid
svm.Unlock()
time.Sleep(svm.CrunchRunDetachDelay)
fmt.Fprintf(stderr, "starting %s\n", uuid)
logger := svm.sis.logger.WithFields(logrus.Fields{
"Instance": svm.id,
"ContainerUUID": uuid,
+ "PID": pid,
})
logger.Printf("[test] starting crunch-run stub")
go func() {
logger.Print("[test] container not in queue")
return
}
+
+ defer func() {
+ if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil {
+ svm.CrashRunningContainer(ctr)
+ }
+ }()
+
if crashluck > svm.CrunchRunCrashRate/2 {
time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
ctr.State = arvados.ContainerStateRunning
- queue.Notify(ctr)
+ if !queue.Notify(ctr) {
+ ctr, _ = queue.Get(uuid)
+ logger.Print("[test] erroring out because state=Running update was rejected")
+ return
+ }
}
time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+
svm.Lock()
- _, running := svm.running[uuid]
- svm.Unlock()
- if !running {
+ defer svm.Unlock()
+ if svm.running[uuid] != pid {
logger.Print("[test] container was killed")
return
}
- if svm.ExecuteContainer != nil {
- ctr.ExitCode = svm.ExecuteContainer(ctr)
- }
- // TODO: Check whether the stub instance has
- // been destroyed, and if so, don't call
- // queue.Notify. Then "container finished
- // twice" can be classified as a bug.
+ delete(svm.running, uuid)
+
if crashluck < svm.CrunchRunCrashRate {
- logger.Print("[test] crashing crunch-run stub")
+ logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
} else {
+ if svm.ExecuteContainer != nil {
+ ctr.ExitCode = svm.ExecuteContainer(ctr)
+ }
+ logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub")
ctr.State = arvados.ContainerStateComplete
- queue.Notify(ctr)
+ go queue.Notify(ctr)
}
- logger.Print("[test] exiting crunch-run stub")
- svm.Lock()
- defer svm.Unlock()
- delete(svm.running, uuid)
}()
return 0
}
}
if strings.HasPrefix(command, "crunch-run --kill ") {
svm.Lock()
- defer svm.Unlock()
- if svm.running[uuid] {
- delete(svm.running, uuid)
+ pid, running := svm.running[uuid]
+ if running && !svm.killing[uuid] {
+ svm.killing[uuid] = true
+ go func() {
+ time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond)
+ svm.Lock()
+ defer svm.Unlock()
+ if svm.running[uuid] == pid {
+ // Kill only if the running entry
+ // hasn't since been killed and
+ // replaced with a different one.
+ delete(svm.running, uuid)
+ }
+ delete(svm.killing, uuid)
+ }()
+ svm.Unlock()
+ time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
+ svm.Lock()
+ _, running = svm.running[uuid]
+ }
+ svm.Unlock()
+ if running {
+ fmt.Fprintf(stderr, "%s: container is running\n", uuid)
+ return 1
} else {
fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+ return 0
}
- return 0
}
if command == "true" {
return 0
defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
defaultTimeoutShutdown = time.Second * 10
+ defaultTimeoutTERM = time.Minute * 2
+ defaultTimeoutSignal = time.Second * 5
// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
installPublicKey: installPublicKey,
stop: make(chan bool),
}
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutShutdown time.Duration
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
installPublicKey ssh.PublicKey
// private state
if !ok {
return errors.New("requested instance does not exist")
}
- wkr.idleBehavior = idleBehavior
- wkr.saveTags()
- wkr.shutdownIfIdle()
+ wkr.setIdleBehavior(idleBehavior)
return nil
}
probed: now,
busy: now,
updated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
+ running: make(map[string]*remoteRunner),
+ starting: make(map[string]*remoteRunner),
probing: make(chan struct{}, 1),
}
wp.workers[id] = wkr
return wkr, true
}
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
- wp.exited[uuid] = t
-}
-
// Shutdown shuts down a worker with the given type, or returns false
// if all workers with the given type are busy.
func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
//
// KillContainer returns immediately; the act of killing the container
// takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
+ logger := wp.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "Reason": reason,
+ })
if _, ok := wp.exited[uuid]; ok {
- wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+ logger.Debug("clearing placeholder for exited crunch-run process")
delete(wp.exited, uuid)
return
}
for _, wkr := range wp.workers {
- if _, ok := wkr.running[uuid]; ok {
- go wp.kill(wkr, uuid)
- return
+ rr := wkr.running[uuid]
+ if rr == nil {
+ rr = wkr.starting[uuid]
}
- }
- wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
- logger := wp.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "Instance": wkr.instance.ID(),
- })
- logger.Debug("killing process")
- cmd := "crunch-run --kill 15 " + uuid
- if u := wkr.instance.RemoteUser(); u != "root" {
- cmd = "sudo " + cmd
- }
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
- if err != nil {
- logger.WithFields(logrus.Fields{
- "stderr": string(stderr),
- "stdout": string(stdout),
- "error": err,
- }).Warn("kill failed")
- return
- }
- logger.Debug("killing process succeeded")
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- if _, ok := wkr.running[uuid]; ok {
- delete(wkr.running, uuid)
- if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
- wkr.state = StateIdle
+ if rr != nil {
+ rr.Kill(reason)
+ return
}
- wkr.updated = time.Now()
- go wp.notify()
}
+ logger.Debug("cannot kill: already disappeared")
}
func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
})
logger.Info("instance disappeared in cloud")
delete(wp.workers, id)
- go wkr.executor.Close()
+ go wkr.Close()
notify = true
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "syscall"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
+)
+
+// remoteRunner handles the starting and stopping of a crunch-run
+// process on a remote machine.
+type remoteRunner struct {
+ uuid string
+ executor Executor
+ arvClient *arvados.Client
+ remoteUser string
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
+ onUnkillable func(uuid string) // callback invoked when giving up on SIGTERM
+ onKilled func(uuid string) // callback invoked when process exits after SIGTERM
+ logger logrus.FieldLogger
+
+ stopping bool // true if Stop() has been called
+ givenup bool // true if timeoutTERM has been reached
+ closed chan struct{} // channel is closed if Close() has been called
+}
+
+// newRemoteRunner returns a new remoteRunner. Caller should ensure
+// Close() is called to release resources.
+func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+ rr := &remoteRunner{
+ uuid: uuid,
+ executor: wkr.executor,
+ arvClient: wkr.wp.arvClient,
+ remoteUser: wkr.instance.RemoteUser(),
+ timeoutTERM: wkr.wp.timeoutTERM,
+ timeoutSignal: wkr.wp.timeoutSignal,
+ onUnkillable: wkr.onUnkillable,
+ onKilled: wkr.onKilled,
+ logger: wkr.logger.WithField("ContainerUUID", uuid),
+ closed: make(chan struct{}),
+ }
+ return rr
+}
+
+// Start a crunch-run process on the remote host.
+//
+// Start does not return any error encountered. The caller should
+// assume the remote process _might_ have started, at least until it
+// probes the worker and finds otherwise.
+func (rr *remoteRunner) Start() {
+ env := map[string]string{
+ "ARVADOS_API_HOST": rr.arvClient.APIHost,
+ "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
+ }
+ if rr.arvClient.Insecure {
+ env["ARVADOS_API_HOST_INSECURE"] = "1"
+ }
+ envJSON, err := json.Marshal(env)
+ if err != nil {
+ panic(err)
+ }
+ stdin := bytes.NewBuffer(envJSON)
+ cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+ if rr.remoteUser != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
+ if err != nil {
+ rr.logger.WithField("stdout", string(stdout)).
+ WithField("stderr", string(stderr)).
+ WithError(err).
+ Error("error starting crunch-run process")
+ return
+ }
+ rr.logger.Info("crunch-run process started")
+}
+
+// Close abandons the remote process (if any) and releases
+// resources. Close must not be called more than once.
+func (rr *remoteRunner) Close() {
+ close(rr.closed)
+}
+
+// Kill starts a background task to kill the remote process, first
+// trying SIGTERM until reaching timeoutTERM, then calling
+// onUnkillable().
+//
+// SIGKILL is not used. It would merely kill the crunch-run supervisor
+// and thereby make the docker container, arv-mount, etc. invisible to
+// us without actually stopping them.
+//
+// Once Kill has been called, calling it again has no effect.
+func (rr *remoteRunner) Kill(reason string) {
+ if rr.stopping {
+ return
+ }
+ rr.stopping = true
+ rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
+ go func() {
+ termDeadline := time.Now().Add(rr.timeoutTERM)
+ t := time.NewTicker(rr.timeoutSignal)
+ defer t.Stop()
+ for range t.C {
+ switch {
+ case rr.isClosed():
+ return
+ case time.Now().After(termDeadline):
+ rr.logger.Debug("giving up")
+ rr.givenup = true
+ rr.onUnkillable(rr.uuid)
+ return
+ default:
+ rr.kill(syscall.SIGTERM)
+ }
+ }
+ }()
+}
+
+func (rr *remoteRunner) kill(sig syscall.Signal) {
+ logger := rr.logger.WithField("Signal", int(sig))
+ logger.Info("sending signal")
+ cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+ if rr.remoteUser != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
+ if err != nil {
+ logger.WithFields(logrus.Fields{
+ "stderr": string(stderr),
+ "stdout": string(stdout),
+ "error": err,
+ }).Info("kill attempt unsuccessful")
+ return
+ }
+ rr.onKilled(rr.uuid)
+}
+
+func (rr *remoteRunner) isClosed() bool {
+ select {
+ case <-rr.closed:
+ return true
+ default:
+ return false
+ }
+}
import (
"bytes"
- "encoding/json"
"fmt"
"strings"
"sync"
busy time.Time
destroyed time.Time
lastUUID string
- running map[string]struct{} // remember to update state idle<->running when this changes
- starting map[string]struct{} // remember to update state idle<->running when this changes
+ running map[string]*remoteRunner // remember to update state idle<->running when this changes
+ starting map[string]*remoteRunner // remember to update state idle<->running when this changes
probing chan struct{}
}
+func (wkr *worker) onUnkillable(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ logger := wkr.logger.WithField("ContainerUUID", uuid)
+ if wkr.idleBehavior == IdleBehaviorHold {
+ logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+ return
+ }
+ logger.Warn("unkillable container, draining worker")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ wkr.closeRunner(uuid)
+ go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+ wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+ wkr.idleBehavior = idleBehavior
+ wkr.saveTags()
+ wkr.shutdownIfIdle()
+}
+
// caller must have lock.
func (wkr *worker) startContainer(ctr arvados.Container) {
logger := wkr.logger.WithFields(logrus.Fields{
"ContainerUUID": ctr.UUID,
"Priority": ctr.Priority,
})
- logger = logger.WithField("Instance", wkr.instance.ID())
logger.Debug("starting container")
- wkr.starting[ctr.UUID] = struct{}{}
+ rr := newRemoteRunner(ctr.UUID, wkr)
+ wkr.starting[ctr.UUID] = rr
if wkr.state != StateRunning {
wkr.state = StateRunning
go wkr.wp.notify()
}
go func() {
- env := map[string]string{
- "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
- "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
- }
- if wkr.wp.arvClient.Insecure {
- env["ARVADOS_API_HOST_INSECURE"] = "1"
- }
- envJSON, err := json.Marshal(env)
- if err != nil {
- panic(err)
- }
- stdin := bytes.NewBuffer(envJSON)
- cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
- if u := wkr.instance.RemoteUser(); u != "root" {
- cmd = "sudo " + cmd
- }
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
+ rr.Start()
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
wkr.updated = now
wkr.busy = now
delete(wkr.starting, ctr.UUID)
- wkr.running[ctr.UUID] = struct{}{}
- wkr.lastUUID = ctr.UUID
- if err != nil {
- logger.WithField("stdout", string(stdout)).
- WithField("stderr", string(stderr)).
- WithError(err).
- Error("error starting crunch-run process")
- // Leave uuid in wkr.running, though: it's
- // possible the error was just a communication
- // failure and the process was in fact
- // started. Wait for next probe to find out.
- return
- }
- logger.Info("crunch-run process started")
+ wkr.running[ctr.UUID] = rr
wkr.lastUUID = ctr.UUID
}()
}
// advantage of the non-busy state, though.
wkr.busy = updateTime
}
- changed := false
- // Build a new "running" map. Set changed=true if it differs
- // from the existing map (wkr.running) to ensure the scheduler
- // gets notified below.
- running := map[string]struct{}{}
- for _, uuid := range ctrUUIDs {
- running[uuid] = struct{}{}
- if _, ok := wkr.running[uuid]; !ok {
- if _, ok := wkr.starting[uuid]; !ok {
- // We didn't start it -- it must have
- // been started by a previous
- // dispatcher process.
- logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
- }
- changed = true
- }
- }
- for uuid := range wkr.running {
- if _, ok := running[uuid]; !ok {
- logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
- wkr.wp.notifyExited(uuid, updateTime)
- changed = true
- }
- }
+ changed := wkr.updateRunning(ctrUUIDs)
// Update state if this was the first successful boot-probe.
if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
// Log whenever a run-probe reveals crunch-run processes
// appearing/disappearing before boot-probe succeeds.
- if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+ if wkr.state == StateUnknown && changed {
logger.WithFields(logrus.Fields{
- "RunningContainers": len(running),
+ "RunningContainers": len(wkr.running),
"State": wkr.state,
}).Info("crunch-run probe succeeded, but boot probe is still failing")
}
- wkr.running = running
if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
wkr.state = StateRunning
} else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
wkr.updated = updateTime
if booted && (initialState == StateUnknown || initialState == StateBooting) {
logger.WithFields(logrus.Fields{
- "RunningContainers": len(running),
+ "RunningContainers": len(wkr.running),
"State": wkr.state,
}).Info("probes succeeded, instance is in service")
}
return true
}
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
// caller must have lock.
-func (wkr *worker) shutdownIfIdle() bool {
+func (wkr *worker) eligibleForShutdown() bool {
if wkr.idleBehavior == IdleBehaviorHold {
- // Never shut down.
return false
}
- age := time.Since(wkr.busy)
-
- old := age >= wkr.wp.timeoutIdle
draining := wkr.idleBehavior == IdleBehaviorDrain
- shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
- (draining && wkr.state == StateBooting)
- if !shouldShutdown {
+ switch wkr.state {
+ case StateBooting:
+ return draining
+ case StateIdle:
+ return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+ case StateRunning:
+ if !draining {
+ return false
+ }
+ for _, rr := range wkr.running {
+ if !rr.givenup {
+ return false
+ }
+ }
+ for _, rr := range wkr.starting {
+ if !rr.givenup {
+ return false
+ }
+ }
+ // draining, and all remaining runners are just trying
+ // to force-kill their crunch-run procs
+ return true
+ default:
return false
}
+}
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+ if !wkr.eligibleForShutdown() {
+ return false
+ }
wkr.logger.WithFields(logrus.Fields{
"State": wkr.state,
- "IdleDuration": stats.Duration(age),
+ "IdleDuration": stats.Duration(time.Since(wkr.busy)),
"IdleBehavior": wkr.idleBehavior,
- }).Info("shutdown idle worker")
+ }).Info("shutdown worker")
wkr.shutdown()
return true
}
}()
}
}
+
+func (wkr *worker) Close() {
+ // This might take time, so do it after unlocking mtx.
+ defer wkr.executor.Close()
+
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ for uuid, rr := range wkr.running {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+ for uuid, rr := range wkr.starting {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+ alive := map[string]bool{}
+ for _, uuid := range ctrUUIDs {
+ alive[uuid] = true
+ if _, ok := wkr.running[uuid]; ok {
+ // unchanged
+ } else if rr, ok := wkr.starting[uuid]; ok {
+ wkr.running[uuid] = rr
+ delete(wkr.starting, uuid)
+ changed = true
+ } else {
+ // We didn't start it -- it must have been
+ // started by a previous dispatcher process.
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+ wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+ changed = true
+ }
+ }
+ for uuid := range wkr.running {
+ if !alive[uuid] {
+ wkr.closeRunner(uuid)
+ changed = true
+ }
+ }
+ return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+ rr := wkr.running[uuid]
+ if rr == nil {
+ return
+ }
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+ delete(wkr.running, uuid)
+ rr.Close()
+
+ now := time.Now()
+ wkr.updated = now
+ wkr.wp.exited[uuid] = now
+ if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+ wkr.state = StateIdle
+ }
+}
busy: ctime,
probed: ctime,
updated: ctime,
+ running: map[string]*remoteRunner{},
+ starting: map[string]*remoteRunner{},
+ probing: make(chan struct{}, 1),
}
if trial.running > 0 {
- wkr.running = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+ uuid := "zzzzz-dz642-abcdefghijklmno"
+ wkr.running = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
}
if trial.starting > 0 {
- wkr.starting = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+ uuid := "zzzzz-dz642-bcdefghijklmnop"
+ wkr.starting = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
}
wkr.probeAndUpdate()
c.Check(wkr.state, check.Equals, trial.expectState)
// Maximum total worker probes per second
MaxProbesPerSecond int
+
+ // Time before repeating SIGTERM when killing a container
+ TimeoutSignal Duration
+
+ // Time to give up on SIGTERM and write off the worker
+ TimeoutTERM Duration
}
type CloudVMs struct {
proc, err := os.FindProcess(pi.PID)
if err != nil {
+ // FindProcess should have succeeded, even if the
+ // process does not exist.
return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
}
+ // Send the requested signal once, then send signal 0 a few
+ // times. When proc.Signal() returns an error (process no
+ // longer exists), return success. If that doesn't happen
+ // within 1 second, return an error.
err = proc.Signal(signal)
for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
err = proc.Signal(syscall.Signal(0))
}
if err == nil {
+ // Reached deadline without a proc.Signal() error.
return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
}
fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)