Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>
FROM debian:jessie
MAINTAINER Ward Vandewege <ward@curoverse.com>
+RUN perl -ni~ -e 'print unless /jessie-updates/' /etc/apt/sources.list
+
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies.
FROM debian:8
MAINTAINER Ward Vandewege <wvandewege@veritasgenetics.com>
+RUN perl -ni~ -e 'print unless /jessie-updates/' /etc/apt/sources.list
+
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies
fi
}
-title () {
- txt="********** $1 **********"
- printf "\n%*s%s\n\n" $((($COLUMNS-${#txt})/2)) "" "$txt"
+title() {
+ printf '%s %s\n' "=======" "$1"
}
checkexit() {
if [[ "$1" != "0" ]]; then
- title "!!!!!! $2 FAILED !!!!!!"
+ title "$2 -- FAILED"
failures+=("$2 (`timer`)")
else
successes+=("$2 (`timer`)")
if [[ ${#failures[@]} == 0 ]]
then
- echo "All test suites passed."
+ if [[ ${#successes[@]} != 0 ]]; then
+ echo "All test suites passed."
+ fi
else
echo "Failures (${#failures[@]}):"
for x in "${failures[@]}"
Options:
--skip FOO Do not test the FOO component.
+--skip sanity Skip initial dev environment sanity checks.
+--skip install Do not run any install steps. Just run tests.
+ You should provide GOPATH, GEMHOME, and VENVDIR options
+ from a previous invocation if you use this option.
--only FOO Do not test anything except the FOO component.
--temp DIR Install components and dependencies under DIR instead of
making a new temporary directory. Implies --leave-temp.
subsequent invocations.
--repeat N Repeat each install/test step until it succeeds N times.
--retry Prompt to retry if an install or test suite fails.
---skip-install Do not run any install steps. Just run tests.
- You should provide GOPATH, GEMHOME, and VENVDIR options
- from a previous invocation if you use this option.
--only-install Run specific install step
--short Skip (or scale down) some slow tests.
+--interactive Set up, then prompt for test/install steps to perform.
WORKSPACE=path Arvados source tree to test.
CONFIGSRC=path Dir with api server config files to copy into source tree.
(If none given, leave config files alone in source tree.)
envvar=value Set \$envvar to value. Primarily useful for WORKSPACE,
*_test, and other examples shown above.
-Assuming --skip-install is not given, all components are installed
+Assuming "--skip install" is not given, all components are installed
into \$GOPATH, \$VENDIR, and \$GEMHOME before running any tests. Many
test suites depend on other components being installed, and installing
everything tends to be quicker than debugging dependencies.
lib/dispatchcloud/scheduler
lib/dispatchcloud/ssh_executor
lib/dispatchcloud/worker
+lib/service
services/api
services/arv-git-httpd
services/crunchstat
sdk/R
tools/sync-groups
tools/crunchstat-summary
+tools/crunchstat-summary:py3
tools/keep-exercise
tools/keep-rsync
tools/keep-block-check
exit_cleanly() {
trap - INT
- create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+ if which create-plot-data-from-log.sh >/dev/null; then
+ create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+ fi
rotate_logfile "$WORKSPACE/apps/workbench/log/" "test.log"
stop_services
rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
}
sanity_checks() {
+ [[ -n "${skip[sanity]}" ]] && return 0
( [[ -n "$WORKSPACE" ]] && [[ -d "$WORKSPACE/services" ]] ) \
|| fatal "WORKSPACE environment variable not set to a source directory (see: $0 --help)"
echo Checking dependencies:
--short)
short=1
;;
+ --interactive)
+ interactive=1
+ ;;
--skip-install)
- only_install=nothing
+ skip[install]=1
;;
--only-install)
only_install="$1"; shift
fi
start_services() {
+ if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
+ return 0
+ fi
+ . "$VENVDIR/bin/activate"
echo 'Starting API, keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
if [[ ! -d "$WORKSPACE/services/api/log" ]]; then
mkdir -p "$WORKSPACE/services/api/log"
if [[ -f "$WORKSPACE/tmp/api.pid" && ! -s "$WORKSPACE/tmp/api.pid" ]]; then
rm -f "$WORKSPACE/tmp/api.pid"
fi
+ all_services_stopped=
+ fail=0
cd "$WORKSPACE" \
- && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo fail=1) \
+ && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo "fail=1; false") \
&& export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
&& export ARVADOS_TEST_API_INSTALLED="$$" \
&& python sdk/python/tests/run_test_server.py start_controller \
&& python sdk/python/tests/run_test_server.py start_keep-web \
&& python sdk/python/tests/run_test_server.py start_arv-git-httpd \
&& python sdk/python/tests/run_test_server.py start_ws \
- && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo fail=1) \
- && (env | egrep ^ARVADOS)
- if [[ -n "$fail" ]]; then
- return 1
+ && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo "fail=1; false") \
+ && (env | egrep ^ARVADOS) \
+ || fail=1
+ deactivate
+ if [[ $fail != 0 ]]; then
+ unset ARVADOS_TEST_API_HOST
fi
+ return $fail
}
stop_services() {
- if [[ -z "$ARVADOS_TEST_API_HOST" ]]; then
+ if [[ -n "$all_services_stopped" ]]; then
return
fi
unset ARVADOS_TEST_API_HOST
+ . "$VENVDIR/bin/activate" || return
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py stop_nginx \
&& python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
&& python sdk/python/tests/run_test_server.py stop_keep-web \
&& python sdk/python/tests/run_test_server.py stop_keep_proxy \
&& python sdk/python/tests/run_test_server.py stop_controller \
- && python sdk/python/tests/run_test_server.py stop
+ && python sdk/python/tests/run_test_server.py stop \
+ && all_services_stopped=1
+ deactivate
}
interrupt() {
}
trap interrupt INT
-sanity_checks
-
-echo "WORKSPACE=$WORKSPACE"
-
-if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
- # Jenkins expects us to use this by default.
- CONFIGSRC="$HOME/arvados-api-server"
-fi
-
-# Clean up .pyc files that may exist in the workspace
-cd "$WORKSPACE"
-find -name '*.pyc' -delete
-
-if [[ -z "$temp" ]]; then
- temp="$(mktemp -d)"
-fi
-
-# Set up temporary install dirs (unless existing dirs were supplied)
-for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
-do
- if [[ -z "${!tmpdir}" ]]; then
- eval "$tmpdir"="$temp/$tmpdir"
- fi
- if ! [[ -d "${!tmpdir}" ]]; then
- mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
- fi
-done
-
-rm -vf "${WORKSPACE}/tmp/*.log"
-
setup_ruby_environment() {
if [[ -s "$HOME/.rvm/scripts/rvm" ]] ; then
source "$HOME/.rvm/scripts/rvm"
"$venvdest/bin/pip" install --no-cache-dir 'mock>=1.0' 'pbr<1.7.0'
}
-export PERLINSTALLBASE
-export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
+initialize() {
+ sanity_checks
-export R_LIBS
+ echo "WORKSPACE=$WORKSPACE"
-export GOPATH
-(
- set -e
- mkdir -p "$GOPATH/src/git.curoverse.com"
- if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
- for d in \
- "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
- "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
- "$GOPATH/src/git.curoverse.com/arvados.git"; do
- [[ -d "$d" ]] && rmdir "$d"
- done
+ if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
+ # Jenkins expects us to use this by default.
+ CONFIGSRC="$HOME/arvados-api-server"
fi
- for d in \
- "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
- "$GOPATH/src/git.curoverse.com/arvados.git"; do
- [[ -h "$d" ]] && rm "$d"
- done
- ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
- go get -v github.com/kardianos/govendor
- cd "$GOPATH/src/git.curoverse.com/arvados.git"
- if [[ -n "$short" ]]; then
- go get -v -d ...
- "$GOPATH/bin/govendor" sync
- else
- # Remove cached source dirs in workdir. Otherwise, they will
- # not qualify as +missing or +external below, and we won't be
- # able to detect that they're missing from vendor/vendor.json.
- rm -rf vendor/*/
- go get -v -d ...
- "$GOPATH/bin/govendor" sync
- [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
- || fatal "vendor/vendor.json has unused or missing dependencies -- try:
-(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
+ # Clean up .pyc files that may exist in the workspace
+ cd "$WORKSPACE"
+ find -name '*.pyc' -delete
-";
+ if [[ -z "$temp" ]]; then
+ temp="$(mktemp -d)"
fi
-) || fatal "Go setup failed"
-setup_virtualenv "$VENVDIR" --python python2.7
-. "$VENVDIR/bin/activate"
+ # Set up temporary install dirs (unless existing dirs were supplied)
+ for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
+ do
+ if [[ -z "${!tmpdir}" ]]; then
+ eval "$tmpdir"="$temp/$tmpdir"
+ fi
+ if ! [[ -d "${!tmpdir}" ]]; then
+ mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
+ fi
+ done
-# Needed for run_test_server.py which is used by certain (non-Python) tests.
-pip install --no-cache-dir PyYAML \
- || fatal "pip install PyYAML failed"
+ rm -vf "${WORKSPACE}/tmp/*.log"
-# Preinstall libcloud if using a fork; otherwise nodemanager "pip
-# install" won't pick it up by default.
-if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
- pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
- || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
- || fatal "pip install apache-libcloud failed"
-fi
+ export PERLINSTALLBASE
+ export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
-# Deactivate Python 2 virtualenv
-deactivate
+ export R_LIBS
-declare -a pythonstuff
-pythonstuff=(
- sdk/pam
- sdk/python
- sdk/python:py3
- sdk/cwl
- sdk/cwl:py3
- services/dockercleaner:py3
- services/fuse
- services/nodemanager
- tools/crunchstat-summary
- )
+ export GOPATH
-# If Python 3 is available, set up its virtualenv in $VENV3DIR.
-# Otherwise, skip dependent tests.
-PYTHON3=$(which python3)
-if [[ ${?} = 0 ]]; then
- setup_virtualenv "$VENV3DIR" --python python3
-else
- PYTHON3=
- cat >&2 <<EOF
+ # Jenkins config requires that glob tmp/*.log match something. Ensure
+ # that happens even if we don't end up running services that set up
+ # logging.
+ mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
+ touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
-Warning: python3 could not be found. Python 3 tests will be skipped.
+ unset http_proxy https_proxy no_proxy
-EOF
-fi
-# Reactivate Python 2 virtualenv
-. "$VENVDIR/bin/activate"
+ # Note: this must be the last time we change PATH, otherwise rvm will
+ # whine a lot.
+ setup_ruby_environment
+
+ echo "PATH is $PATH"
+}
+
+install_env() {
+ (
+ set -e
+ mkdir -p "$GOPATH/src/git.curoverse.com"
+ if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
+ for d in \
+ "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
+ "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
+ "$GOPATH/src/git.curoverse.com/arvados.git"; do
+ [[ -d "$d" ]] && rmdir "$d"
+ done
+ fi
+ for d in \
+ "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
+ "$GOPATH/src/git.curoverse.com/arvados.git"; do
+ [[ -h "$d" ]] && rm "$d"
+ done
+ ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+ go get -v github.com/kardianos/govendor
+ cd "$GOPATH/src/git.curoverse.com/arvados.git"
+ if [[ -n "$short" ]]; then
+ go get -v -d ...
+ "$GOPATH/bin/govendor" sync
+ else
+ # Remove cached source dirs in workdir. Otherwise, they will
+ # not qualify as +missing or +external below, and we won't be
+ # able to detect that they're missing from vendor/vendor.json.
+ rm -rf vendor/*/
+ go get -v -d ...
+ "$GOPATH/bin/govendor" sync
+ [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
+ || fatal "vendor/vendor.json has unused or missing dependencies -- try:
+
+(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
-# Note: this must be the last time we change PATH, otherwise rvm will
-# whine a lot.
-setup_ruby_environment
+";
+ fi
+ ) || fatal "Go setup failed"
-echo "PATH is $PATH"
+ setup_virtualenv "$VENVDIR" --python python2.7
+ . "$VENVDIR/bin/activate"
-if ! which bundler >/dev/null
-then
- gem install --user-install bundler || fatal 'Could not install bundler'
-fi
+ # Needed for run_test_server.py which is used by certain (non-Python) tests.
+ pip install --no-cache-dir PyYAML \
+ || fatal "pip install PyYAML failed"
-# Jenkins config requires that glob tmp/*.log match something. Ensure
-# that happens even if we don't end up running services that set up
-# logging.
-mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
-touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
+ # Preinstall libcloud if using a fork; otherwise nodemanager "pip
+ # install" won't pick it up by default.
+ if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
+ pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
+ || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
+ || fatal "pip install apache-libcloud failed"
+ fi
+
+ # Deactivate Python 2 virtualenv
+ deactivate
+
+ # If Python 3 is available, set up its virtualenv in $VENV3DIR.
+ # Otherwise, skip dependent tests.
+ PYTHON3=$(which python3)
+ if [[ ${?} = 0 ]]; then
+ setup_virtualenv "$VENV3DIR" --python python3
+ else
+ PYTHON3=
+ cat >&2 <<EOF
+
+Warning: python3 could not be found. Python 3 tests will be skipped.
+
+EOF
+ fi
+
+ if ! which bundler >/dev/null
+ then
+ gem install --user-install bundler || fatal 'Could not install bundler'
+ fi
+}
retry() {
remain="${repeat}"
if ${@}; then
if [[ "$remain" -gt 1 ]]; then
remain=$((${remain}-1))
- title "Repeating ${remain} more times"
+ title "(repeating ${remain} more times)"
else
break
fi
suite="${1}"
;;
esac
- if [[ -z "${skip[$suite]}" && -z "${skip[$1]}" && \
- (${#only[@]} -eq 0 || ${only[$suite]} -eq 1 || \
- ${only[$1]} -eq 1) ||
- ${only[$2]} -eq 1 ]]; then
- retry do_test_once ${@}
- else
- title "Skipping ${1} tests"
+ if [[ -n "${skip[$suite]}" || \
+ -n "${skip[$1]}" || \
+ (${#only[@]} -ne 0 && ${only[$suite]} -eq 0 && ${only[$1]} -eq 0) ]]; then
+ return 0
fi
+ case "${1}" in
+ services/api)
+ stop_services
+ ;;
+ doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
+ # don't care whether services are running
+ ;;
+ *)
+ if ! start_services; then
+ title "test $1 -- failed to start services"
+ return 1
+ fi
+ ;;
+ esac
+ retry do_test_once ${@}
}
do_test_once() {
unset result
- title "Running $1 tests"
+ title "test $1"
timer_reset
- if [[ "$2" == "go" ]]
+
+ if which deactivate >/dev/null; then deactivate; fi
+ if ! . "$VENVDIR/bin/activate"
+ then
+ result=1
+ elif [[ "$2" == "go" ]]
then
covername="coverage-$(echo "$1" | sed -e 's/\//_/g')"
coverflags=("-covermode=count" "-coverprofile=$WORKSPACE/tmp/.$covername.tmp")
fi
result=${result:-$?}
checkexit $result "$1 tests"
- title "End of $1 tests (`timer`)"
+ title "test $1 -- `timer`"
return $result
}
do_install() {
- skipit=false
-
- if [[ -z "${only_install}" || "${only_install}" == "${1}" || "${only_install}" == "${2}" ]]; then
- retry do_install_once ${@}
- else
- skipit=true
- fi
-
- if [[ "$skipit" = true ]]; then
- title "Skipping $1 install"
- fi
+ if [[ -n "${skip[install]}" || ( -n "${only_install}" && "${only_install}" != "${1}" && "${only_install}" != "${2}" ) ]]; then
+ return 0
+ fi
+ retry do_install_once ${@}
}
do_install_once() {
- title "Running $1 install"
+ title "install $1"
timer_reset
- if [[ "$2" == "go" ]]
+
+ if which deactivate >/dev/null; then deactivate; fi
+ if [[ "$1" != "env" ]] && ! . "$VENVDIR/bin/activate"; then
+ result=1
+ elif [[ "$2" == "go" ]]
then
go get -ldflags "-X main.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1"
elif [[ "$2" == "pip" ]]
else
"install_$1"
fi
- result=$?
+ result=${result:-$?}
checkexit $result "$1 install"
- title "End of $1 install (`timer`)"
+ title "install $1 -- `timer`"
return $result
}
&& bundle_install_trylocal \
&& rm -rf .site
}
-do_install doc
install_gem() {
gemname=$1
&& with_test_gemset gem install --no-ri --no-rdoc $(ls -t "$gemname"-*.gem|head -n1)
}
-install_ruby_sdk() {
+install_sdk/ruby() {
install_gem arvados sdk/ruby
}
-do_install sdk/ruby ruby_sdk
-install_R_sdk() {
+install_sdk/R() {
if [[ "$NEED_SDK_R" = true ]]; then
cd "$WORKSPACE/sdk/R" \
&& Rscript --vanilla install_deps.R
fi
}
-do_install sdk/R R_sdk
-install_perl_sdk() {
+install_sdk/perl() {
cd "$WORKSPACE/sdk/perl" \
&& perl Makefile.PL INSTALL_BASE="$PERLINSTALLBASE" \
&& make install INSTALLDIRS=perl
}
-do_install sdk/perl perl_sdk
-install_cli() {
+install_sdk/cli() {
install_gem arvados-cli sdk/cli
}
-do_install sdk/cli cli
-install_login-sync() {
+install_services/login-sync() {
install_gem arvados-login-sync services/login-sync
}
-do_install services/login-sync login-sync
-
-# Install the Python SDK early. Various other test suites (like
-# keepproxy) bring up run_test_server.py, which imports the arvados
-# module. We can't actually *test* the Python SDK yet though, because
-# its own test suite brings up some of those other programs (like
-# keepproxy).
-for p in "${pythonstuff[@]}"
-do
- dir=${p%:py3}
- if [[ ${dir} = ${p} ]]; then
- if [[ -z ${skip[python2]} ]]; then
- do_install ${dir} pip
- fi
- elif [[ -n ${PYTHON3} ]]; then
- if [[ -z ${skip[python3]} ]]; then
- do_install ${dir} pip "$VENV3DIR/bin/"
- fi
- fi
-done
-install_apiserver() {
+install_services/api() {
cd "$WORKSPACE/services/api" \
&& RAILS_ENV=test bundle_install_trylocal
&& RAILS_ENV=test bundle exec rake db:setup \
&& RAILS_ENV=test bundle exec rake db:fixtures:load
}
-do_install services/api apiserver
+
+declare -a pythonstuff
+pythonstuff=(
+ sdk/pam
+ sdk/python
+ sdk/python:py3
+ sdk/cwl
+ sdk/cwl:py3
+ services/dockercleaner:py3
+ services/fuse
+ services/nodemanager
+ tools/crunchstat-summary
+)
declare -a gostuff
gostuff=(
lib/dispatchcloud/scheduler
lib/dispatchcloud/ssh_executor
lib/dispatchcloud/worker
+ lib/service
sdk/go/arvados
sdk/go/arvadosclient
sdk/go/auth
tools/keep-rsync
tools/sync-groups
)
-for g in "${gostuff[@]}"
-do
- do_install "$g" go
-done
-install_workbench() {
+install_apps/workbench() {
cd "$WORKSPACE/apps/workbench" \
&& mkdir -p tmp/cache \
&& RAILS_ENV=test bundle_install_trylocal \
&& RAILS_ENV=test RAILS_GROUPS=assets bundle exec rake npm:install
}
-do_install apps/workbench workbench
-
-unset http_proxy https_proxy no_proxy
-test_doclinkchecker() {
+test_doc() {
(
set -e
cd "$WORKSPACE/doc"
PYTHONPATH=$WORKSPACE/sdk/python/ bundle exec rake linkchecker baseurl=file://$WORKSPACE/doc/.site/ arvados_workbench_host=https://workbench.$ARVADOS_API_HOST arvados_api_host=$ARVADOS_API_HOST
)
}
-do_test doc doclinkchecker
-
-stop_services
-test_apiserver() {
+test_services/api() {
rm -f "$WORKSPACE/services/api/git-commit.version"
cd "$WORKSPACE/services/api" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[services/api]}
}
-do_test services/api apiserver
-
-# Shortcut for when we're only running apiserver tests. This saves a bit of time,
-# because we don't need to start up the api server for subsequent tests.
-if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
- rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
- exit_cleanly
-fi
-start_services || { stop_services; fatal "start_services"; }
-
-test_ruby_sdk() {
+test_sdk/ruby() {
cd "$WORKSPACE/sdk/ruby" \
&& bundle exec rake test TESTOPTS=-v ${testargs[sdk/ruby]}
}
-do_test sdk/ruby ruby_sdk
-test_R_sdk() {
+test_sdk/R() {
if [[ "$NEED_SDK_R" = true ]]; then
cd "$WORKSPACE/sdk/R" \
&& Rscript --vanilla run_test.R
fi
}
-do_test sdk/R R_sdk
-
-test_cli() {
+test_sdk/cli() {
cd "$WORKSPACE/sdk/cli" \
&& mkdir -p /tmp/keep \
&& KEEP_LOCAL_STORE=/tmp/keep bundle exec rake test TESTOPTS=-v ${testargs[sdk/cli]}
}
-do_test sdk/cli cli
-test_login-sync() {
+test_services/login-sync() {
cd "$WORKSPACE/services/login-sync" \
&& bundle exec rake test TESTOPTS=-v ${testargs[services/login-sync]}
}
-do_test services/login-sync login-sync
-test_nodemanager_integration() {
+test_services/nodemanager_integration() {
cd "$WORKSPACE/services/nodemanager" \
&& tests/integration_test.py ${testargs[services/nodemanager_integration]}
}
-do_test services/nodemanager_integration nodemanager_integration
-
-for p in "${pythonstuff[@]}"
-do
- dir=${p%:py3}
- if [[ ${dir} = ${p} ]]; then
- if [[ -z ${skip[python2]} ]]; then
- do_test ${dir} pip
- fi
- elif [[ -n ${PYTHON3} ]]; then
- if [[ -z ${skip[python3]} ]]; then
- do_test ${dir} pip "$VENV3DIR/bin/"
- fi
- fi
-done
-for g in "${gostuff[@]}"
-do
- do_test "$g" go
-done
-
-test_workbench_units() {
+test_apps/workbench_units() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:units TESTOPTS=-v ${testargs[apps/workbench]}
}
-do_test apps/workbench_units workbench_units
-test_workbench_functionals() {
+test_apps/workbench_functionals() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:functionals TESTOPTS=-v ${testargs[apps/workbench]}
}
-do_test apps/workbench_functionals workbench_functionals
-test_workbench_integration() {
+test_apps/workbench_integration() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:integration TESTOPTS=-v ${testargs[apps/workbench]}
}
-do_test apps/workbench_integration workbench_integration
-
-test_workbench_benchmark() {
+test_apps/workbench_benchmark() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]}
}
-do_test apps/workbench_benchmark workbench_benchmark
-test_workbench_profile() {
+test_apps/workbench_profile() {
cd "$WORKSPACE/apps/workbench" \
&& env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:profile ${testargs[apps/workbench_profile]}
}
-do_test apps/workbench_profile workbench_profile
+install_deps() {
+ # Install parts needed by test suites
+ do_install env
+ do_install cmd/arvados-server go
+ do_install sdk/cli
+ do_install sdk/perl
+ do_install sdk/python pip
+ do_install sdk/ruby
+ do_install services/api
+ do_install services/arv-git-httpd go
+ do_install services/keepproxy go
+ do_install services/keepstore go
+ do_install services/keep-web go
+ do_install services/ws go
+}
+
+install_all() {
+ do_install env
+ do_install doc
+ do_install sdk/ruby
+ do_install sdk/R
+ do_install sdk/perl
+ do_install sdk/cli
+ do_install services/login-sync
+ for p in "${pythonstuff[@]}"
+ do
+ dir=${p%:py3}
+ if [[ ${dir} = ${p} ]]; then
+ if [[ -z ${skip[python2]} ]]; then
+ do_install ${dir} pip
+ fi
+ elif [[ -n ${PYTHON3} ]]; then
+ if [[ -z ${skip[python3]} ]]; then
+ do_install ${dir} pip "$VENV3DIR/bin/"
+ fi
+ fi
+ done
+ do_install services/api
+ for g in "${gostuff[@]}"
+ do
+ do_install "$g" go
+ done
+ do_install apps/workbench
+}
+
+test_all() {
+ stop_services
+ do_test services/api
+
+ # Shortcut for when we're only running apiserver tests. This saves a bit of time,
+ # because we don't need to start up the api server for subsequent tests.
+ if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
+ rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
+ exit_cleanly
+ fi
+
+ do_test doc
+ do_test sdk/ruby
+ do_test sdk/R
+ do_test sdk/cli
+ do_test services/login-sync
+ do_test services/nodemanager_integration
+ for p in "${pythonstuff[@]}"
+ do
+ dir=${p%:py3}
+ if [[ ${dir} = ${p} ]]; then
+ if [[ -z ${skip[python2]} ]]; then
+ do_test ${dir} pip
+ fi
+ elif [[ -n ${PYTHON3} ]]; then
+ if [[ -z ${skip[python3]} ]]; then
+ do_test ${dir} pip "$VENV3DIR/bin/"
+ fi
+ fi
+ done
+
+ for g in "${gostuff[@]}"
+ do
+ do_test "$g" go
+ done
+ do_test apps/workbench_units
+ do_test apps/workbench_functionals
+ do_test apps/workbench_integration
+ do_test apps/workbench_benchmark
+ do_test apps/workbench_profile
+}
+
+help_interactive() {
+ echo "== Interactive commands:"
+ echo "TARGET (short for 'test DIR')"
+ echo "test TARGET"
+ echo "test TARGET:py3 (test with python3)"
+ echo "test TARGET -check.vv (pass arguments to test)"
+ echo "install TARGET"
+ echo "install env (go/python libs)"
+ echo "install deps (go/python libs + arvados components needed for integration tests)"
+ echo "reset (...services used by integration tests)"
+ echo "exit"
+ echo "== Test targets:"
+ echo "${!testfuncargs[@]}" | tr ' ' '\n' | sort | column
+}
+
+initialize
+
+declare -A testfuncargs=()
+for g in "${gostuff[@]}"; do
+ testfuncargs[$g]="$g go"
+done
+for p in "${pythonstuff[@]}"; do
+ dir=${p%:py3}
+ testfuncargs[$dir]="$dir pip $VENVDIR/bin/"
+ testfuncargs[$dir:py3]="$dir pip $VENV3DIR/bin/"
+done
+
+if [[ -z ${interactive} ]]; then
+ install_all
+ test_all
+else
+ skip=()
+ only=()
+ only_install=()
+ if [[ -e "$VENVDIR/bin/activate" ]]; then stop_services; fi
+ setnextcmd() {
+ if [[ "$nextcmd" != "install deps" ]]; then
+ :
+ elif [[ -e "$VENVDIR/bin/activate" ]]; then
+ nextcmd="test lib/cmd"
+ else
+ nextcmd="install deps"
+ fi
+ }
+ echo
+ help_interactive
+ nextcmd="install deps"
+ setnextcmd
+ while read -p 'What next? ' -e -i "${nextcmd}" nextcmd; do
+ read verb target opts <<<"${nextcmd}"
+ target="${target%/}"
+ target="${target/\/:/:}"
+ if [[ -z "${target}" ]]; then
+ help_interactive
+ continue
+ fi
+ case "${verb}" in
+ "" | "help")
+ help_interactive
+ ;;
+ "exit" | "quit")
+ exit_cleanly
+ ;;
+ "reset")
+ stop_services
+ ;;
+ *)
+ testargs["$target"]="${opts}"
+ case "$target" in
+ all | deps)
+ ${verb}_${target}
+ ;;
+ *)
+ tt="${testfuncargs[${target}]}"
+ tt="${tt:-$target}"
+ do_$verb $tt
+ ;;
+ esac
+ ;;
+ esac
+ if [[ ${#successes[@]} -gt 0 || ${#failures[@]} -gt 0 ]]; then
+ report_outcomes
+ successes=()
+ failures=()
+ fi
+ cd "$WORKSPACE"
+ setnextcmd
+ done
+ echo
+fi
exit_cleanly
az.stopWg.Add(1)
defer az.stopWg.Done()
+ if instanceType.AddedScratch > 0 {
+ return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
+ }
+
name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
if err != nil {
return nil, err
func (ai *azureInstance) Address() string {
if ai.nic.IPConfigurations != nil &&
len(*ai.nic.IPConfigurations) > 0 &&
- (*ai.nic.IPConfigurations)[0].PrivateIPAddress != nil {
+ (*ai.nic.IPConfigurations)[0].InterfaceIPConfigurationPropertiesFormat != nil &&
+ (*ai.nic.IPConfigurations)[0].InterfaceIPConfigurationPropertiesFormat.PrivateIPAddress != nil {
return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
}
var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
-func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile, _ string) service.Handler {
return &Handler{Cluster: cluster, NodeProfile: np}
}
},
}
node := s.cluster.NodeProfiles["*"]
- s.handler = newHandler(s.ctx, s.cluster, &node)
+ s.handler = newHandler(s.ctx, s.cluster, &node, "")
}
func (s *HandlerSuite) TearDownTest(c *check.C) {
"Keep-Alive": true,
"Proxy-Authenticate": true,
"Proxy-Authorization": true,
+ // this line makes gofmt 1.10 and 1.11 agree
"TE": true,
"Trailer": true,
"Transfer-Encoding": true, // *-Encoding headers interfer with Go's automatic compression/decompression
import (
"context"
+ "fmt"
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/service"
var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
-func newHandler(ctx context.Context, cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
- d := &dispatcher{Cluster: cluster, Context: ctx}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile, token string) service.Handler {
+ ac, err := arvados.NewClientFromConfig(cluster)
+ if err != nil {
+ return service.ErrorHandler(ctx, cluster, np, fmt.Errorf("error initializing client from cluster config: %s", err))
+ }
+ d := &dispatcher{
+ Cluster: cluster,
+ Context: ctx,
+ ArvClient: ac,
+ AuthToken: token,
+ }
go d.Start()
return d
}
// cache up to date.
type Queue struct {
logger logrus.FieldLogger
- reg *prometheus.Registry
chooseType typeChooser
client APIClient
// Arvados cluster's queue during Update, chooseType will be called to
// assign an appropriate arvados.InstanceType for the queue entry.
func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
- return &Queue{
+ cq := &Queue{
logger: logger,
- reg: reg,
chooseType: chooseType,
client: client,
current: map[string]QueueEnt{},
subscribers: map[<-chan struct{}]chan struct{}{},
}
+ if reg != nil {
+ go cq.runMetrics(reg)
+ }
+ return cq
}
// Subscribe returns a channel that becomes ready to receive when an
}
return results, nil
}
+
+func (cq *Queue) runMetrics(reg *prometheus.Registry) {
+ mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "queue_entries",
+ Help: "Number of active container entries in the controller database.",
+ }, []string{"state", "instance_type"})
+ reg.MustRegister(mEntries)
+
+ type entKey struct {
+ state arvados.ContainerState
+ inst string
+ }
+ count := map[entKey]int{}
+
+ ch := cq.Subscribe()
+ defer cq.Unsubscribe(ch)
+ for range ch {
+ for k := range count {
+ count[k] = 0
+ }
+ ents, _ := cq.Entries()
+ for _, ent := range ents {
+ count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+ }
+ for k, v := range count {
+ mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
+ }
+ }
+}
scheduler.WorkerPool
Instances() []worker.InstanceView
SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
+ KillInstance(id cloud.InstanceID, reason string) error
Stop()
}
type dispatcher struct {
Cluster *arvados.Cluster
Context context.Context
+ ArvClient *arvados.Client
+ AuthToken string
InstanceSetID cloud.InstanceSetID
logger logrus.FieldLogger
}
func (disp *dispatcher) initialize() {
- arvClient := arvados.NewClientFromEnv()
+ disp.logger = ctxlog.FromContext(disp.Context)
+
+ disp.ArvClient.AuthToken = disp.AuthToken
+
if disp.InstanceSetID == "" {
- if strings.HasPrefix(arvClient.AuthToken, "v2/") {
- disp.InstanceSetID = cloud.InstanceSetID(strings.Split(arvClient.AuthToken, "/")[1])
+ if strings.HasPrefix(disp.AuthToken, "v2/") {
+ disp.InstanceSetID = cloud.InstanceSetID(strings.Split(disp.AuthToken, "/")[1])
} else {
// Use some other string unique to this token
// that doesn't reveal the token itself.
- disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(arvClient.AuthToken))))
+ disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(disp.AuthToken))))
}
}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
- disp.logger = ctxlog.FromContext(disp.Context)
if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
}
disp.instanceSet = instanceSet
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
- disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
+ disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
+ disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, disp.ArvClient)
if disp.Cluster.ManagementToken == "" {
disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
}
+// Management API: shutdown/destroy specified instance now.
+func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
+ id := cloud.InstanceID(r.FormValue("instance_id"))
+ if id == "" {
+ httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+ return
+ }
+ err := disp.pool.KillInstance(id, "via management API: "+r.FormValue("reason"))
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusNotFound)
+ return
+ }
+}
+
func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
id := cloud.InstanceID(r.FormValue("instance_id"))
if id == "" {
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/ctxlog"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
s.cluster = &arvados.Cluster{
CloudVMs: arvados.CloudVMs{
- Driver: "test",
- SyncInterval: arvados.Duration(10 * time.Millisecond),
- TimeoutIdle: arvados.Duration(150 * time.Millisecond),
- TimeoutBooting: arvados.Duration(150 * time.Millisecond),
- TimeoutProbe: arvados.Duration(15 * time.Millisecond),
- TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ Driver: "test",
+ SyncInterval: arvados.Duration(10 * time.Millisecond),
+ TimeoutIdle: arvados.Duration(150 * time.Millisecond),
+ TimeoutBooting: arvados.Duration(150 * time.Millisecond),
+ TimeoutProbe: arvados.Duration(15 * time.Millisecond),
+ TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ MaxCloudOpsPerSecond: 500,
},
Dispatch: arvados.Dispatch{
PrivateKey: string(dispatchprivraw),
ProbeInterval: arvados.Duration(5 * time.Millisecond),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
MaxProbesPerSecond: 1000,
+ TimeoutSignal: arvados.Duration(3 * time.Millisecond),
+ TimeoutTERM: arvados.Duration(20 * time.Millisecond),
},
InstanceTypes: arvados.InstanceTypeMap{
test.InstanceType(1).Name: test.InstanceType(1),
DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
},
},
+ Services: arvados.Services{
+ Controller: arvados.Service{ExternalURL: arvados.URL{Scheme: "https", Host: os.Getenv("ARVADOS_API_HOST")}},
+ },
}
+
+ arvClient, err := arvados.NewClientFromConfig(s.cluster)
+ c.Check(err, check.IsNil)
+
s.disp = &dispatcher{
- Cluster: s.cluster,
- Context: s.ctx,
+ Cluster: s.cluster,
+ Context: s.ctx,
+ ArvClient: arvClient,
+ AuthToken: arvadostest.AdminToken,
}
// Test cases can modify s.cluster before calling
// initialize(), and then modify private state before calling
for _, ctr := range queue.Containers {
waiting[ctr.UUID] = struct{}{}
}
- executeContainer := func(ctr arvados.Container) int {
+ finishContainer := func(ctr arvados.Container) {
mtx.Lock()
defer mtx.Unlock()
if _, ok := waiting[ctr.UUID]; !ok {
- c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
- return 1
+ c.Errorf("container completed twice: %s", ctr.UUID)
+ return
}
delete(waiting, ctr.UUID)
if len(waiting) == 0 {
close(done)
}
+ }
+ executeContainer := func(ctr arvados.Container) int {
+ finishContainer(ctr)
return int(rand.Uint32() & 0x3)
}
n := 0
stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
stubvm.ExecuteContainer = executeContainer
+ stubvm.CrashRunningContainer = finishContainer
switch n % 7 {
case 0:
stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
case 1:
stubvm.CrunchRunMissing = true
+ case 2:
+ stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
default:
stubvm.CrunchRunCrashRate = 0.1
}
c.Check(ok, check.Equals, true)
<-ch
- sr = getInstances()
+ for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); {
+ sr = getInstances()
+ if len(sr.Items) > 0 {
+ break
+ }
+ time.Sleep(time.Millisecond)
+ }
c.Assert(len(sr.Items), check.Equals, 1)
c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
import (
"fmt"
+ "time"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/cloud/azure"
"git.curoverse.com/arvados.git/lib/cloud/ec2"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
var drivers = map[string]cloud.Driver{
if !ok {
return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
}
- return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+ is, err := driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+ if maxops := cluster.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
+ is = &rateLimitedInstanceSet{
+ InstanceSet: is,
+ ticker: time.NewTicker(time.Second / time.Duration(maxops)),
+ }
+ }
+ return is, err
+}
+
+type rateLimitedInstanceSet struct {
+ cloud.InstanceSet
+ ticker *time.Ticker
+}
+
+func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+ <-is.ticker.C
+ inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
+ return &rateLimitedInstance{inst, is.ticker}, err
+}
+
+type rateLimitedInstance struct {
+ cloud.Instance
+ ticker *time.Ticker
+}
+
+func (inst *rateLimitedInstance) Destroy() error {
+ <-inst.ticker.C
+ return inst.Instance.Destroy()
}
Create(arvados.InstanceType) bool
Shutdown(arvados.InstanceType) bool
StartContainer(arvados.InstanceType, arvados.Container) bool
- KillContainer(uuid string)
+ KillContainer(uuid, reason string)
Subscribe() <-chan struct{}
Unsubscribe(<-chan struct{})
}
p.unalloc[it]++
return true
}
-func (p *stubPool) KillContainer(uuid string) {
+func (p *stubPool) KillContainer(uuid, reason string) {
p.Lock()
defer p.Unlock()
delete(p.running, uuid)
if !running {
go sch.cancel(uuid, "not running on any worker")
} else if !exited.IsZero() && qUpdated.After(exited) {
- go sch.cancel(uuid, "state=\"Running\" after crunch-run exited")
+ go sch.cancel(uuid, "state=Running after crunch-run exited")
} else if ent.Container.Priority == 0 {
go sch.kill(uuid, "priority=0")
}
// of kill() will be to make the
// worker available for the next
// container.
- go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
} else {
sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
// a network outage and is still
// preparing to run a container that
// has already been unlocked/requeued.
- go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
}
case arvados.ContainerStateLocked:
if running && !exited.IsZero() && qUpdated.After(exited) {
}
func (sch *Scheduler) kill(uuid string, reason string) {
- logger := sch.logger.WithField("ContainerUUID", uuid)
- logger.Debugf("killing crunch-run process because %s", reason)
- sch.pool.KillContainer(uuid)
+ sch.pool.KillContainer(uuid, reason)
}
func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
//
// The resulting changes are not exposed through Get() or Entries()
// until the next call to Update().
-func (q *Queue) Notify(upd arvados.Container) {
+//
+// Return value is true unless the update is rejected (invalid state
+// transition).
+func (q *Queue) Notify(upd arvados.Container) bool {
q.mtx.Lock()
defer q.mtx.Unlock()
for i, ctr := range q.Containers {
if ctr.UUID == upd.UUID {
- q.Containers[i] = upd
- return
+ if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+ q.Containers[i] = upd
+ return true
+ }
+ return false
}
}
q.Containers = append(q.Containers, upd)
+ return true
}
tags: copyTags(tags),
providerType: it.ProviderType,
initCommand: cmd,
+ running: map[string]int64{},
+ killing: map[string]bool{},
}
svm.SSHService = SSHService{
HostKey: sis.driver.HostKey,
// running (and might change IP addresses, shut down, etc.) without
// updating any stubInstances that have been returned to callers.
type StubVM struct {
- Boot time.Time
- Broken time.Time
- CrunchRunMissing bool
- CrunchRunCrashRate float64
- CrunchRunDetachDelay time.Duration
- ExecuteContainer func(arvados.Container) int
+ Boot time.Time
+ Broken time.Time
+ ReportBroken time.Time
+ CrunchRunMissing bool
+ CrunchRunCrashRate float64
+ CrunchRunDetachDelay time.Duration
+ ExecuteContainer func(arvados.Container) int
+ CrashRunningContainer func(arvados.Container)
sis *StubInstanceSet
id cloud.InstanceID
initCommand cloud.InitCommand
providerType string
SSHService SSHService
- running map[string]bool
+ running map[string]int64
+ killing map[string]bool
+ lastPID int64
sync.Mutex
}
}
for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
if stdinKV[name] == "" {
- fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
+ fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
return 1
}
}
svm.Lock()
- if svm.running == nil {
- svm.running = map[string]bool{}
- }
- svm.running[uuid] = true
+ svm.lastPID++
+ pid := svm.lastPID
+ svm.running[uuid] = pid
svm.Unlock()
time.Sleep(svm.CrunchRunDetachDelay)
fmt.Fprintf(stderr, "starting %s\n", uuid)
logger := svm.sis.logger.WithFields(logrus.Fields{
"Instance": svm.id,
"ContainerUUID": uuid,
+ "PID": pid,
})
logger.Printf("[test] starting crunch-run stub")
go func() {
logger.Print("[test] container not in queue")
return
}
+
+ defer func() {
+ if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil {
+ svm.CrashRunningContainer(ctr)
+ }
+ }()
+
if crashluck > svm.CrunchRunCrashRate/2 {
time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
ctr.State = arvados.ContainerStateRunning
- queue.Notify(ctr)
+ if !queue.Notify(ctr) {
+ ctr, _ = queue.Get(uuid)
+ logger.Print("[test] erroring out because state=Running update was rejected")
+ return
+ }
}
time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+
svm.Lock()
- _, running := svm.running[uuid]
- svm.Unlock()
- if !running {
+ defer svm.Unlock()
+ if svm.running[uuid] != pid {
logger.Print("[test] container was killed")
return
}
- if svm.ExecuteContainer != nil {
- ctr.ExitCode = svm.ExecuteContainer(ctr)
- }
- // TODO: Check whether the stub instance has
- // been destroyed, and if so, don't call
- // queue.Notify. Then "container finished
- // twice" can be classified as a bug.
+ delete(svm.running, uuid)
+
if crashluck < svm.CrunchRunCrashRate {
- logger.Print("[test] crashing crunch-run stub")
+ logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
} else {
+ if svm.ExecuteContainer != nil {
+ ctr.ExitCode = svm.ExecuteContainer(ctr)
+ }
+ logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub")
ctr.State = arvados.ContainerStateComplete
- queue.Notify(ctr)
+ go queue.Notify(ctr)
}
- logger.Print("[test] exiting crunch-run stub")
- svm.Lock()
- defer svm.Unlock()
- delete(svm.running, uuid)
}()
return 0
}
for uuid := range svm.running {
fmt.Fprintf(stdout, "%s\n", uuid)
}
+ if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
+ fmt.Fprintln(stdout, "broken")
+ }
return 0
}
if strings.HasPrefix(command, "crunch-run --kill ") {
svm.Lock()
- defer svm.Unlock()
- if svm.running[uuid] {
- delete(svm.running, uuid)
+ pid, running := svm.running[uuid]
+ if running && !svm.killing[uuid] {
+ svm.killing[uuid] = true
+ go func() {
+ time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond)
+ svm.Lock()
+ defer svm.Unlock()
+ if svm.running[uuid] == pid {
+ // Kill only if the running entry
+ // hasn't since been killed and
+ // replaced with a different one.
+ delete(svm.running, uuid)
+ }
+ delete(svm.killing, uuid)
+ }()
+ svm.Unlock()
+ time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
+ svm.Lock()
+ _, running = svm.running[uuid]
+ }
+ svm.Unlock()
+ if running {
+ fmt.Fprintf(stderr, "%s: container is running\n", uuid)
+ return 1
} else {
fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+ return 0
}
- return 0
}
if command == "true" {
return 0
defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
defaultTimeoutShutdown = time.Second * 10
+ defaultTimeoutTERM = time.Minute * 2
+ defaultTimeoutSignal = time.Second * 5
// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
installPublicKey: installPublicKey,
stop: make(chan bool),
}
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutShutdown time.Duration
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
installPublicKey ssh.PublicKey
// private state
if !ok {
return errors.New("requested instance does not exist")
}
- wkr.idleBehavior = idleBehavior
- wkr.saveTags()
- wkr.shutdownIfIdle()
+ wkr.setIdleBehavior(idleBehavior)
return nil
}
probed: now,
busy: now,
updated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
+ running: make(map[string]*remoteRunner),
+ starting: make(map[string]*remoteRunner),
probing: make(chan struct{}, 1),
}
wp.workers[id] = wkr
return wkr, true
}
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
- wp.exited[uuid] = t
-}
-
// Shutdown shuts down a worker with the given type, or returns false
// if all workers with the given type are busy.
func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
//
// KillContainer returns immediately; the act of killing the container
// takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
+ logger := wp.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "Reason": reason,
+ })
if _, ok := wp.exited[uuid]; ok {
- wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+ logger.Debug("clearing placeholder for exited crunch-run process")
delete(wp.exited, uuid)
return
}
for _, wkr := range wp.workers {
- if _, ok := wkr.running[uuid]; ok {
- go wp.kill(wkr, uuid)
- return
+ rr := wkr.running[uuid]
+ if rr == nil {
+ rr = wkr.starting[uuid]
}
- }
- wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
- logger := wp.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "Instance": wkr.instance.ID(),
- })
- logger.Debug("killing process")
- cmd := "crunch-run --kill 15 " + uuid
- if u := wkr.instance.RemoteUser(); u != "root" {
- cmd = "sudo " + cmd
- }
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
- if err != nil {
- logger.WithFields(logrus.Fields{
- "stderr": string(stderr),
- "stdout": string(stdout),
- "error": err,
- }).Warn("kill failed")
- return
- }
- logger.Debug("killing process succeeded")
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- if _, ok := wkr.running[uuid]; ok {
- delete(wkr.running, uuid)
- if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
- wkr.state = StateIdle
+ if rr != nil {
+ rr.Kill(reason)
+ return
}
- wkr.updated = time.Now()
- go wp.notify()
}
+ logger.Debug("cannot kill: already disappeared")
}
func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
return r
}
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("instance not found")
+ }
+ wkr.logger.WithField("Reason", reason).Info("shutting down")
+ wkr.shutdown()
+ return nil
+}
+
func (wp *Pool) setup() {
wp.creating = map[string]createCall{}
wp.exited = map[string]time.Time{}
})
logger.Info("instance disappeared in cloud")
delete(wp.workers, id)
- go wkr.executor.Close()
+ go wkr.Close()
notify = true
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "syscall"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
+)
+
+// remoteRunner handles the starting and stopping of a crunch-run
+// process on a remote machine.
+type remoteRunner struct {
+ uuid string
+ executor Executor
+ arvClient *arvados.Client
+ remoteUser string
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
+ onUnkillable func(uuid string) // callback invoked when giving up on SIGTERM
+ onKilled func(uuid string) // callback invoked when process exits after SIGTERM
+ logger logrus.FieldLogger
+
+ stopping bool // true if Stop() has been called
+ givenup bool // true if timeoutTERM has been reached
+ closed chan struct{} // channel is closed if Close() has been called
+}
+
+// newRemoteRunner returns a new remoteRunner. Caller should ensure
+// Close() is called to release resources.
+func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+ rr := &remoteRunner{
+ uuid: uuid,
+ executor: wkr.executor,
+ arvClient: wkr.wp.arvClient,
+ remoteUser: wkr.instance.RemoteUser(),
+ timeoutTERM: wkr.wp.timeoutTERM,
+ timeoutSignal: wkr.wp.timeoutSignal,
+ onUnkillable: wkr.onUnkillable,
+ onKilled: wkr.onKilled,
+ logger: wkr.logger.WithField("ContainerUUID", uuid),
+ closed: make(chan struct{}),
+ }
+ return rr
+}
+
+// Start a crunch-run process on the remote host.
+//
+// Start does not return any error encountered. The caller should
+// assume the remote process _might_ have started, at least until it
+// probes the worker and finds otherwise.
+func (rr *remoteRunner) Start() {
+ env := map[string]string{
+ "ARVADOS_API_HOST": rr.arvClient.APIHost,
+ "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
+ }
+ if rr.arvClient.Insecure {
+ env["ARVADOS_API_HOST_INSECURE"] = "1"
+ }
+ envJSON, err := json.Marshal(env)
+ if err != nil {
+ panic(err)
+ }
+ stdin := bytes.NewBuffer(envJSON)
+ cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+ if rr.remoteUser != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
+ if err != nil {
+ rr.logger.WithField("stdout", string(stdout)).
+ WithField("stderr", string(stderr)).
+ WithError(err).
+ Error("error starting crunch-run process")
+ return
+ }
+ rr.logger.Info("crunch-run process started")
+}
+
+// Close abandons the remote process (if any) and releases
+// resources. Close must not be called more than once.
+func (rr *remoteRunner) Close() {
+ close(rr.closed)
+}
+
+// Kill starts a background task to kill the remote process, first
+// trying SIGTERM until reaching timeoutTERM, then calling
+// onUnkillable().
+//
+// SIGKILL is not used. It would merely kill the crunch-run supervisor
+// and thereby make the docker container, arv-mount, etc. invisible to
+// us without actually stopping them.
+//
+// Once Kill has been called, calling it again has no effect.
+func (rr *remoteRunner) Kill(reason string) {
+ if rr.stopping {
+ return
+ }
+ rr.stopping = true
+ rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
+ go func() {
+ termDeadline := time.Now().Add(rr.timeoutTERM)
+ t := time.NewTicker(rr.timeoutSignal)
+ defer t.Stop()
+ for range t.C {
+ switch {
+ case rr.isClosed():
+ return
+ case time.Now().After(termDeadline):
+ rr.logger.Debug("giving up")
+ rr.givenup = true
+ rr.onUnkillable(rr.uuid)
+ return
+ default:
+ rr.kill(syscall.SIGTERM)
+ }
+ }
+ }()
+}
+
+func (rr *remoteRunner) kill(sig syscall.Signal) {
+ logger := rr.logger.WithField("Signal", int(sig))
+ logger.Info("sending signal")
+ cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+ if rr.remoteUser != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
+ if err != nil {
+ logger.WithFields(logrus.Fields{
+ "stderr": string(stderr),
+ "stdout": string(stdout),
+ "error": err,
+ }).Info("kill attempt unsuccessful")
+ return
+ }
+ rr.onKilled(rr.uuid)
+}
+
+func (rr *remoteRunner) isClosed() bool {
+ select {
+ case <-rr.closed:
+ return true
+ default:
+ return false
+ }
+}
package worker
import (
- "bytes"
- "encoding/json"
"fmt"
"strings"
"sync"
busy time.Time
destroyed time.Time
lastUUID string
- running map[string]struct{} // remember to update state idle<->running when this changes
- starting map[string]struct{} // remember to update state idle<->running when this changes
+ running map[string]*remoteRunner // remember to update state idle<->running when this changes
+ starting map[string]*remoteRunner // remember to update state idle<->running when this changes
probing chan struct{}
}
+func (wkr *worker) onUnkillable(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ logger := wkr.logger.WithField("ContainerUUID", uuid)
+ if wkr.idleBehavior == IdleBehaviorHold {
+ logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+ return
+ }
+ logger.Warn("unkillable container, draining worker")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ wkr.closeRunner(uuid)
+ go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+ wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+ wkr.idleBehavior = idleBehavior
+ wkr.saveTags()
+ wkr.shutdownIfIdle()
+}
+
// caller must have lock.
func (wkr *worker) startContainer(ctr arvados.Container) {
logger := wkr.logger.WithFields(logrus.Fields{
"ContainerUUID": ctr.UUID,
"Priority": ctr.Priority,
})
- logger = logger.WithField("Instance", wkr.instance.ID())
logger.Debug("starting container")
- wkr.starting[ctr.UUID] = struct{}{}
+ rr := newRemoteRunner(ctr.UUID, wkr)
+ wkr.starting[ctr.UUID] = rr
if wkr.state != StateRunning {
wkr.state = StateRunning
go wkr.wp.notify()
}
go func() {
- env := map[string]string{
- "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
- "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
- }
- if wkr.wp.arvClient.Insecure {
- env["ARVADOS_API_HOST_INSECURE"] = "1"
- }
- envJSON, err := json.Marshal(env)
- if err != nil {
- panic(err)
- }
- stdin := bytes.NewBuffer(envJSON)
- cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
- if u := wkr.instance.RemoteUser(); u != "root" {
- cmd = "sudo " + cmd
- }
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
+ rr.Start()
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
wkr.updated = now
wkr.busy = now
delete(wkr.starting, ctr.UUID)
- wkr.running[ctr.UUID] = struct{}{}
- wkr.lastUUID = ctr.UUID
- if err != nil {
- logger.WithField("stdout", string(stdout)).
- WithField("stderr", string(stderr)).
- WithError(err).
- Error("error starting crunch-run process")
- // Leave uuid in wkr.running, though: it's
- // possible the error was just a communication
- // failure and the process was in fact
- // started. Wait for next probe to find out.
- return
- }
- logger.Info("crunch-run process started")
+ wkr.running[ctr.UUID] = rr
wkr.lastUUID = ctr.UUID
}()
}
logger.Info("instance booted; will try probeRunning")
}
}
+ reportedBroken := false
if booted || wkr.state == StateUnknown {
- ctrUUIDs, ok = wkr.probeRunning()
+ ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
}
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
+ if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+ logger.Info("probe reported broken instance")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+ }
if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
if wkr.state == StateShutdown && wkr.updated.After(updated) {
// Skip the logging noise if shutdown was
// advantage of the non-busy state, though.
wkr.busy = updateTime
}
- changed := false
- // Build a new "running" map. Set changed=true if it differs
- // from the existing map (wkr.running) to ensure the scheduler
- // gets notified below.
- running := map[string]struct{}{}
- for _, uuid := range ctrUUIDs {
- running[uuid] = struct{}{}
- if _, ok := wkr.running[uuid]; !ok {
- if _, ok := wkr.starting[uuid]; !ok {
- // We didn't start it -- it must have
- // been started by a previous
- // dispatcher process.
- logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
- }
- changed = true
- }
- }
- for uuid := range wkr.running {
- if _, ok := running[uuid]; !ok {
- logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
- wkr.wp.notifyExited(uuid, updateTime)
- changed = true
- }
- }
+ changed := wkr.updateRunning(ctrUUIDs)
// Update state if this was the first successful boot-probe.
if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
// Log whenever a run-probe reveals crunch-run processes
// appearing/disappearing before boot-probe succeeds.
- if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+ if wkr.state == StateUnknown && changed {
logger.WithFields(logrus.Fields{
- "RunningContainers": len(running),
+ "RunningContainers": len(wkr.running),
"State": wkr.state,
}).Info("crunch-run probe succeeded, but boot probe is still failing")
}
- wkr.running = running
if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
wkr.state = StateRunning
} else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
wkr.updated = updateTime
if booted && (initialState == StateUnknown || initialState == StateBooting) {
logger.WithFields(logrus.Fields{
- "RunningContainers": len(running),
+ "RunningContainers": len(wkr.running),
"State": wkr.state,
}).Info("probes succeeded, instance is in service")
}
go wkr.wp.notify()
}
-func (wkr *worker) probeRunning() (running []string, ok bool) {
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
cmd := "crunch-run --list"
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
- return nil, false
+ return
}
- stdout = bytes.TrimRight(stdout, "\n")
- if len(stdout) == 0 {
- return nil, true
+ ok = true
+ for _, s := range strings.Split(string(stdout), "\n") {
+ if s == "broken" {
+ reportsBroken = true
+ } else if s != "" {
+ running = append(running, s)
+ }
}
- return strings.Split(string(stdout), "\n"), true
+ return
}
func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
return true
}
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
// caller must have lock.
-func (wkr *worker) shutdownIfIdle() bool {
+func (wkr *worker) eligibleForShutdown() bool {
if wkr.idleBehavior == IdleBehaviorHold {
- // Never shut down.
return false
}
- age := time.Since(wkr.busy)
-
- old := age >= wkr.wp.timeoutIdle
draining := wkr.idleBehavior == IdleBehaviorDrain
- shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
- (draining && wkr.state == StateBooting)
- if !shouldShutdown {
+ switch wkr.state {
+ case StateBooting:
+ return draining
+ case StateIdle:
+ return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+ case StateRunning:
+ if !draining {
+ return false
+ }
+ for _, rr := range wkr.running {
+ if !rr.givenup {
+ return false
+ }
+ }
+ for _, rr := range wkr.starting {
+ if !rr.givenup {
+ return false
+ }
+ }
+ // draining, and all remaining runners are just trying
+ // to force-kill their crunch-run procs
+ return true
+ default:
return false
}
+}
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+ if !wkr.eligibleForShutdown() {
+ return false
+ }
wkr.logger.WithFields(logrus.Fields{
"State": wkr.state,
- "IdleDuration": stats.Duration(age),
+ "IdleDuration": stats.Duration(time.Since(wkr.busy)),
"IdleBehavior": wkr.idleBehavior,
- }).Info("shutdown idle worker")
+ }).Info("shutdown worker")
wkr.shutdown()
return true
}
}()
}
}
+
+func (wkr *worker) Close() {
+ // This might take time, so do it after unlocking mtx.
+ defer wkr.executor.Close()
+
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ for uuid, rr := range wkr.running {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+ for uuid, rr := range wkr.starting {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+ alive := map[string]bool{}
+ for _, uuid := range ctrUUIDs {
+ alive[uuid] = true
+ if _, ok := wkr.running[uuid]; ok {
+ // unchanged
+ } else if rr, ok := wkr.starting[uuid]; ok {
+ wkr.running[uuid] = rr
+ delete(wkr.starting, uuid)
+ changed = true
+ } else {
+ // We didn't start it -- it must have been
+ // started by a previous dispatcher process.
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+ wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+ changed = true
+ }
+ }
+ for uuid := range wkr.running {
+ if !alive[uuid] {
+ wkr.closeRunner(uuid)
+ changed = true
+ }
+ }
+ return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+ rr := wkr.running[uuid]
+ if rr == nil {
+ return
+ }
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+ delete(wkr.running, uuid)
+ rr.Close()
+
+ now := time.Now()
+ wkr.updated = now
+ wkr.wp.exited[uuid] = now
+ if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+ wkr.state = StateIdle
+ }
+}
busy: ctime,
probed: ctime,
updated: ctime,
+ running: map[string]*remoteRunner{},
+ starting: map[string]*remoteRunner{},
+ probing: make(chan struct{}, 1),
}
if trial.running > 0 {
- wkr.running = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+ uuid := "zzzzz-dz642-abcdefghijklmno"
+ wkr.running = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
}
if trial.starting > 0 {
- wkr.starting = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+ uuid := "zzzzz-dz642-bcdefghijklmnop"
+ wkr.starting = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
}
wkr.probeAndUpdate()
c.Check(wkr.state, check.Equals, trial.expectState)
"fmt"
"io"
"net/http"
+ "net/url"
"os"
"git.curoverse.com/arvados.git/lib/cmd"
CheckHealth() error
}
-type NewHandlerFunc func(context.Context, *arvados.Cluster, *arvados.NodeProfile) Handler
+type NewHandlerFunc func(_ context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, token string) Handler
type command struct {
newHandler NewHandlerFunc
svcName arvados.ServiceName
+ ctx context.Context // enables tests to shutdown service; no public API yet
}
// Command returns a cmd.Handler that loads site config, calls
return &command{
newHandler: newHandler,
svcName: svcName,
+ ctx: context.Background(),
}
}
log = ctxlog.New(stderr, cluster.Logging.Format, cluster.Logging.Level).WithFields(logrus.Fields{
"PID": os.Getpid(),
})
- ctx := ctxlog.Context(context.Background(), log)
+ ctx := ctxlog.Context(c.ctx, log)
+
profileName := *nodeProfile
if profileName == "" {
profileName = os.Getenv("ARVADOS_NODE_PROFILE")
err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
return 1
}
- handler := c.newHandler(ctx, cluster, profile)
+
+ if cluster.SystemRootToken == "" {
+ log.Warn("SystemRootToken missing from cluster config, falling back to ARVADOS_API_TOKEN environment variable")
+ cluster.SystemRootToken = os.Getenv("ARVADOS_API_TOKEN")
+ }
+ if cluster.Services.Controller.ExternalURL.Host == "" {
+ log.Warn("Services.Controller.ExternalURL missing from cluster config, falling back to ARVADOS_API_HOST(_INSECURE) environment variables")
+ u, err := url.Parse("https://" + os.Getenv("ARVADOS_API_HOST"))
+ if err != nil {
+ err = fmt.Errorf("ARVADOS_API_HOST: %s", err)
+ return 1
+ }
+ cluster.Services.Controller.ExternalURL = arvados.URL(*u)
+ if i := os.Getenv("ARVADOS_API_HOST_INSECURE"); i != "" && i != "0" {
+ cluster.TLS.Insecure = true
+ }
+ }
+
+ handler := c.newHandler(ctx, cluster, profile, cluster.SystemRootToken)
if err = handler.CheckHealth(); err != nil {
return 1
}
if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.WithError(err).Errorf("error notifying init daemon")
}
+ go func() {
+ <-ctx.Done()
+ srv.Close()
+ }()
err = srv.Wait()
if err != nil {
return 1
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+// package service provides a cmd.Handler that brings up a system service.
+package service
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "testing"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (*Suite) TestCommand(c *check.C) {
+ cf, err := ioutil.TempFile("", "cmd_test.")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(cf.Name())
+ defer cf.Close()
+ fmt.Fprintf(cf, "Clusters:\n zzzzz:\n SystemRootToken: abcde\n NodeProfiles: {\"*\": {\"arvados-controller\": {Listen: \":1234\"}}}")
+
+ healthCheck := make(chan bool, 1)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, token string) Handler {
+ c.Check(ctx.Value("foo"), check.Equals, "bar")
+ c.Check(token, check.Equals, "abcde")
+ return &testHandler{ctx: ctx, healthCheck: healthCheck}
+ })
+ cmd.(*command).ctx = context.WithValue(ctx, "foo", "bar")
+
+ done := make(chan bool)
+ var stdin, stdout, stderr bytes.Buffer
+
+ go func() {
+ cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
+ close(done)
+ }()
+ select {
+ case <-healthCheck:
+ case <-done:
+ c.Error("command exited without health check")
+ }
+ cancel()
+ c.Check(stdout.String(), check.Equals, "")
+ c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
+}
+
+type testHandler struct {
+ ctx context.Context
+ healthCheck chan bool
+}
+
+func (th *testHandler) ServeHTTP(http.ResponseWriter, *http.Request) {}
+func (th *testHandler) CheckHealth() error {
+ ctxlog.FromContext(th.ctx).Info("CheckHealth called")
+ select {
+ case th.healthCheck <- true:
+ default:
+ }
+ return nil
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package service
+
+import (
+ "context"
+ "net/http"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "github.com/sirupsen/logrus"
+)
+
+// ErrorHandler returns a Handler that reports itself as unhealthy and
+// responds 500 to all requests. ErrorHandler itself logs the given
+// error once, and the handler logs it again for each incoming
+// request.
+func ErrorHandler(ctx context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, err error) Handler {
+ logger := ctxlog.FromContext(ctx)
+ logger.WithError(err).Error("unhealthy service")
+ return errorHandler{err, logger}
+}
+
+type errorHandler struct {
+ err error
+ logger logrus.FieldLogger
+}
+
+func (eh errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ eh.logger.WithError(eh.err).Error("unhealthy service")
+ http.Error(w, "", http.StatusInternalServerError)
+}
+
+func (eh errorHandler) CheckHealth() error {
+ return eh.err
+}
var DefaultSecureClient = &http.Client{
Timeout: 5 * time.Minute}
+// NewClientFromConfig creates a new Client that uses the endpoints in
+// the given cluster.
+//
+// AuthToken is left empty for the caller to populate.
+func NewClientFromConfig(cluster *Cluster) (*Client, error) {
+ ctrlURL := cluster.Services.Controller.ExternalURL
+ if ctrlURL.Host == "" {
+ return nil, fmt.Errorf("no host in config Services.Controller.ExternalURL: %v", ctrlURL)
+ }
+ return &Client{
+ APIHost: fmt.Sprintf("%v", ctrlURL),
+ Insecure: cluster.TLS.Insecure,
+ }, nil
+}
+
// NewClientFromEnv creates a new Client that uses the default HTTP
// client with the API endpoint and credentials given by the
// ARVADOS_API_* environment variables.
"encoding/json"
"errors"
"fmt"
+ "net/url"
"os"
"git.curoverse.com/arvados.git/sdk/go/config"
type Cluster struct {
ClusterID string `json:"-"`
ManagementToken string
+ SystemRootToken string
+ Services Services
NodeProfiles map[string]NodeProfile
InstanceTypes InstanceTypeMap
CloudVMs CloudVMs
PostgreSQL PostgreSQL
RequestLimits RequestLimits
Logging Logging
+ TLS TLS
}
+type Services struct {
+ Controller Service
+ DispatchCloud Service
+ Health Service
+ Keepbalance Service
+ Keepproxy Service
+ Keepstore Service
+ Keepweb Service
+ Nodemanager Service
+ RailsAPI Service
+ Websocket Service
+ Workbench Service
+}
+
+type Service struct {
+ InternalURLs map[URL]ServiceInstance
+ ExternalURL URL
+}
+
+// URL is a url.URL that is also usable as a JSON key/value.
+type URL url.URL
+
+// UnmarshalText implements encoding.TextUnmarshaler so URL can be
+// used as a JSON key/value.
+func (su *URL) UnmarshalText(text []byte) error {
+ u, err := url.Parse(string(text))
+ if err == nil {
+ *su = URL(*u)
+ }
+ return err
+}
+
+type ServiceInstance struct{}
+
type Logging struct {
Level string
Format string
// Maximum total worker probes per second
MaxProbesPerSecond int
+
+ // Time before repeating SIGTERM when killing a container
+ TimeoutSignal Duration
+
+ // Time to give up on SIGTERM and write off the worker
+ TimeoutTERM Duration
}
type CloudVMs struct {
// Time after shutdown to retry shutdown
TimeoutShutdown Duration
+ // Maximum create/destroy-instance operations per second
+ MaxCloudOpsPerSecond int
+
ImageID string
Driver string
TLS bool
Insecure bool
}
+
+type TLS struct {
+ Certificate string
+ Key string
+ Insecure bool
+}
import (
"encoding/json"
+ "errors"
"fmt"
"log"
"os"
}
svcListCacheMtx.Unlock()
- return kc.loadKeepServers(<-cacheEnt.latest)
+ select {
+ case <-time.After(time.Minute):
+ return errors.New("timed out while getting initial list of keep services")
+ case sl := <-cacheEnt.latest:
+ return kc.loadKeepServers(sl)
+ }
}
func (kc *KeepClient) RefreshServiceDiscovery() {
lockdir = "/var/lock"
lockprefix = "crunch-run-"
locksuffix = ".lock"
+ brokenfile = "crunch-run-broken"
)
// procinfo is saved in each process's lockfile.
proc, err := os.FindProcess(pi.PID)
if err != nil {
+ // FindProcess should have succeeded, even if the
+ // process does not exist.
return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
}
+ // Send the requested signal once, then send signal 0 a few
+ // times. When proc.Signal() returns an error (process no
+ // longer exists), return success. If that doesn't happen
+ // within 1 second, return an error.
err = proc.Signal(signal)
for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
err = proc.Signal(syscall.Signal(0))
}
if err == nil {
+ // Reached deadline without a proc.Signal() error.
return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
}
fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
if info.IsDir() && path != walkdir {
return filepath.SkipDir
}
- if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
+ if name := info.Name(); name == brokenfile {
+ fmt.Fprintln(stdout, "broken")
+ return nil
+ } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
return nil
}
if info.Size() == 0 {
func (runner *ContainerRunner) runBrokenNodeHook() {
if *brokenNodeHook == "" {
- runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+ path := filepath.Join(lockdir, brokenfile)
+ runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
+ f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
+ if err != nil {
+ runner.CrunchLog.Printf("Error writing %s: %s", path, err)
+ return
+ }
+ f.Close()
} else {
runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
// run killme script
c.Check(api.CalledWith("container.state", "Queued"), NotNil)
c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
- c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
}
func (s *TestSuite) TestFullBrokenDocker3(c *C) {
import collections
import functools
import arvados.keep
+from prometheus_client import Summary
import Queue
"""
+ fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
+ read_time = fuse_time.labels(op='read')
+ write_time = fuse_time.labels(op='write')
+ destroy_time = fuse_time.labels(op='destroy')
+ on_event_time = fuse_time.labels(op='on_event')
+ getattr_time = fuse_time.labels(op='getattr')
+ setattr_time = fuse_time.labels(op='setattr')
+ lookup_time = fuse_time.labels(op='lookup')
+ forget_time = fuse_time.labels(op='forget')
+ open_time = fuse_time.labels(op='open')
+ release_time = fuse_time.labels(op='release')
+ opendir_time = fuse_time.labels(op='opendir')
+ readdir_time = fuse_time.labels(op='readdir')
+ statfs_time = fuse_time.labels(op='statfs')
+ create_time = fuse_time.labels(op='create')
+ mkdir_time = fuse_time.labels(op='mkdir')
+ unlink_time = fuse_time.labels(op='unlink')
+ rmdir_time = fuse_time.labels(op='rmdir')
+ rename_time = fuse_time.labels(op='rename')
+ flush_time = fuse_time.labels(op='flush')
+
def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
super(Operations, self).__init__()
# initializing to continue
self.initlock.set()
+ def metric_samples(self):
+ return self.fuse_time.collect()[0].samples
+
+ def metric_op_names(self):
+ ops = []
+ for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
+ if cur_op not in ops:
+ ops.append(cur_op)
+ return ops
+
+ def metric_value(self, opname, metric):
+ op_value = [sample.value for sample in self.metric_samples()
+ if sample.name == metric and sample.labels['op'] == opname]
+ return op_value[0] if len(op_value) == 1 else None
+
+ def metric_sum_func(self, opname):
+ return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
+
+ def metric_count_func(self, opname):
+ return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
+
+ @destroy_time.time()
@catch_exceptions
def destroy(self):
self._shutdown_started.set()
[["event_type", "in", ["create", "update", "delete"]]],
self.on_event)
+ @on_event_time.time()
@catch_exceptions
def on_event(self, ev):
if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
self.inodes.inode_cache.find_by_uuid(newowner)):
parent.child_event(ev)
+ @getattr_time.time()
@catch_exceptions
def getattr(self, inode, ctx=None):
if inode not in self.inodes:
return entry
+ @setattr_time.time()
@catch_exceptions
def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
entry = self.getattr(inode)
return entry
+ @lookup_time.time()
@catch_exceptions
def lookup(self, parent_inode, name, ctx=None):
name = unicode(name, self.inodes.encoding)
parent_inode, name)
raise llfuse.FUSEError(errno.ENOENT)
+ @forget_time.time()
@catch_exceptions
def forget(self, inodes):
if self._shutdown_started.is_set():
if ent.dec_ref(nlookup) == 0 and ent.dead:
self.inodes.del_entry(ent)
+ @open_time.time()
@catch_exceptions
def open(self, inode, flags, ctx=None):
if inode in self.inodes:
return fh
+ @read_time.time()
@catch_exceptions
def read(self, fh, off, size):
_logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
self.read_counter.add(len(r))
return r
+ @write_time.time()
@catch_exceptions
def write(self, fh, off, buf):
_logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
self.write_counter.add(w)
return w
+ @release_time.time()
@catch_exceptions
def release(self, fh):
if fh in self._filehandles:
def releasedir(self, fh):
self.release(fh)
+ @opendir_time.time()
@catch_exceptions
def opendir(self, inode, ctx=None):
_logger.debug("arv-mount opendir: inode %i", inode)
self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
return fh
+ @readdir_time.time()
@catch_exceptions
def readdir(self, fh, off):
_logger.debug("arv-mount readdir: fh %i off %i", fh, off)
yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
e += 1
+ @statfs_time.time()
@catch_exceptions
def statfs(self, ctx=None):
st = llfuse.StatvfsData()
return p
+ @create_time.time()
@catch_exceptions
def create(self, inode_parent, name, mode, flags, ctx=None):
_logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
f.inc_ref()
return (fh, self.getattr(f.inode))
+ @mkdir_time.time()
@catch_exceptions
def mkdir(self, inode_parent, name, mode, ctx=None):
_logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
d.inc_ref()
return self.getattr(d.inode)
+ @unlink_time.time()
@catch_exceptions
def unlink(self, inode_parent, name, ctx=None):
_logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.unlink(name)
+ @rmdir_time.time()
@catch_exceptions
def rmdir(self, inode_parent, name, ctx=None):
_logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.rmdir(name)
+ @rename_time.time()
@catch_exceptions
def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
_logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
dest = self._check_writable(inode_parent_new)
dest.rename(name_old, name_new, src)
+ @flush_time.time()
@catch_exceptions
def flush(self, fh):
if fh in self._filehandles:
import sys
import time
+from collections import namedtuple
-class Stat(object):
- def __init__(self, prefix, interval,
- egr_name, ing_name,
- egr_func, ing_func):
+Stat = namedtuple("Stat", ['name', 'get'])
+
+class StatWriter(object):
+ def __init__(self, prefix, interval, stats):
self.prefix = prefix
self.interval = interval
- self.egr_name = egr_name
- self.ing_name = ing_name
- self.egress = egr_func
- self.ingress = ing_func
- self.egr_prev = self.egress()
- self.ing_prev = self.ingress()
-
- def update(self):
- egr = self.egress()
- ing = self.ingress()
+ self.stats = stats
+ self.previous_stats = []
+ self.update_previous_stats()
- delta = " -- interval %.4f seconds %d %s %d %s" % (self.interval,
- egr - self.egr_prev,
- self.egr_name,
- ing - self.ing_prev,
- self.ing_name)
+ def update_previous_stats(self):
+ self.previous_stats = [stat.get() for stat in self.stats]
- sys.stderr.write("crunchstat: %s %d %s %d %s%s\n" % (self.prefix,
- egr,
- self.egr_name,
- ing,
- self.ing_name,
- delta))
+ def update(self):
+ def append_by_type(string, name, value):
+ if type(value) is float:
+ string += " %.6f %s" % (value, name)
+ else:
+ string += " %s %s" % (str(value), name)
+ return string
- self.egr_prev = egr
- self.ing_prev = ing
+ out = "crunchstat: %s" % self.prefix
+ delta = "-- interval %.4f seconds" % self.interval
+ for i, stat in enumerate(self.stats):
+ value = stat.get()
+ diff = value - self.previous_stats[i]
+ delta = append_by_type(delta, stat.name, diff)
+ out = append_by_type(out, stat.name, value)
+ sys.stderr.write("%s %s\n" % (out, delta))
+ self.update_previous_stats()
def statlogger(interval, keep, ops):
- calls = Stat("keepcalls", interval, "put", "get",
- keep.put_counter.get,
- keep.get_counter.get)
- net = Stat("net:keep0", interval, "tx", "rx",
- keep.upload_counter.get,
- keep.download_counter.get)
- cache = Stat("keepcache", interval, "hit", "miss",
- keep.hits_counter.get,
- keep.misses_counter.get)
- fuseops = Stat("fuseops", interval,"write", "read",
- ops.write_ops_counter.get,
- ops.read_ops_counter.get)
- blk = Stat("blkio:0:0", interval, "write", "read",
- ops.write_counter.get,
- ops.read_counter.get)
+ calls = StatWriter("keepcalls", interval, [
+ Stat("put", keep.put_counter.get),
+ Stat("get", keep.get_counter.get)
+ ])
+ net = StatWriter("net:keep0", interval, [
+ Stat("tx", keep.upload_counter.get),
+ Stat("rx", keep.download_counter.get)
+ ])
+ cache = StatWriter("keepcache", interval, [
+ Stat("hit", keep.hits_counter.get),
+ Stat("miss", keep.misses_counter.get)
+ ])
+ fuseops = StatWriter("fuseops", interval, [
+ Stat("write", ops.write_ops_counter.get),
+ Stat("read", ops.read_ops_counter.get)
+ ])
+ fusetimes = []
+ for cur_op in ops.metric_op_names():
+ name = "fuseop:{0}".format(cur_op)
+ fusetimes.append(StatWriter(name, interval, [
+ Stat("count", ops.metric_count_func(cur_op)),
+ Stat("time", ops.metric_sum_func(cur_op))
+ ]))
+ blk = StatWriter("blkio:0:0", interval, [
+ Stat("write", ops.write_counter.get),
+ Stat("read", ops.read_counter.get)
+ ])
while True:
time.sleep(interval)
calls.update()
net.update()
cache.update()
- fuseops.update()
blk.update()
+ fuseops.update()
+ for ftime in fusetimes:
+ ftime.update()
'llfuse >=1.2, <1.3.4',
'python-daemon',
'ciso8601 >= 2.0.0',
- 'setuptools'
+ 'setuptools',
+ "prometheus_client"
],
extras_require={
':python_version<"3"': ['pytz'],
+++ /dev/null
-/usr/local/lib/arvbox/runsu.sh
\ No newline at end of file
--- /dev/null
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+exec 2>&1
+set -ex -o pipefail
+
+. /usr/local/lib/arvbox/common.sh
+
+cat <<EOF >/var/lib/arvados/nginx.conf
+worker_processes auto;
+pid /var/lib/arvados/nginx.pid;
+
+error_log stderr;
+daemon off;
+user arvbox;
+
+events {
+ worker_connections 64;
+}
+
+http {
+ access_log off;
+ include /etc/nginx/mime.types;
+ default_type application/octet-stream;
+ server {
+ listen ${services[doc]} default_server;
+ listen [::]:${services[doc]} default_server;
+ root /usr/src/arvados/doc/.site;
+ index index.html;
+ server_name _;
+ }
+
+ server {
+ listen 80 default_server;
+ server_name _;
+ return 301 https://\$host\$request_uri;
+ }
+
+ upstream controller {
+ server localhost:${services[controller]};
+ }
+ server {
+ listen *:${services[controller-ssl]} ssl default_server;
+ server_name controller;
+ ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+ ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+ location / {
+ proxy_pass http://controller;
+ proxy_set_header Host \$http_host;
+ proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+ proxy_set_header X-Forwarded-Proto https;
+ proxy_redirect off;
+ }
+ }
+
+upstream arvados-ws {
+ server localhost:${services[websockets]};
+}
+server {
+ listen *:${services[websockets-ssl]} ssl default_server;
+ server_name websockets;
+
+ proxy_connect_timeout 90s;
+ proxy_read_timeout 300s;
+
+ ssl on;
+ ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+ ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+
+ location / {
+ proxy_pass http://arvados-ws;
+ proxy_set_header Upgrade \$http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host \$http_host;
+ proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+ }
+}
+
+ upstream workbench2 {
+ server localhost:${services[workbench2]};
+ }
+ server {
+ listen *:${services[workbench2-ssl]} ssl default_server;
+ server_name workbench2;
+ ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+ ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+ location / {
+ proxy_pass http://workbench2;
+ proxy_set_header Host \$http_host;
+ proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+ proxy_set_header X-Forwarded-Proto https;
+ proxy_redirect off;
+ }
+ location /sockjs-node {
+ proxy_pass http://workbench2;
+ proxy_set_header Upgrade \$http_upgrade;
+ proxy_set_header Connection "upgrade";
+ proxy_set_header Host \$http_host;
+ proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+ }
+ }
+
+ upstream keep-web {
+ server localhost:${services[keep-web]};
+ }
+ server {
+ listen *:${services[keep-web-ssl]} ssl default_server;
+ server_name keep-web;
+ ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+ ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+ location / {
+ proxy_pass http://keep-web;
+ proxy_set_header Host \$http_host;
+ proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+ proxy_set_header X-Forwarded-Proto https;
+ proxy_redirect off;
+ }
+ }
+
+}
+
+EOF
+
+exec nginx -c /var/lib/arvados/nginx.conf
+++ /dev/null
-#!/bin/bash
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-exec 2>&1
-set -ex -o pipefail
-
-. /usr/local/lib/arvbox/common.sh
-
-cat <<EOF >/var/lib/arvados/nginx.conf
-worker_processes auto;
-pid /var/lib/arvados/nginx.pid;
-
-error_log stderr;
-daemon off;
-
-events {
- worker_connections 64;
-}
-
-http {
- access_log off;
- include /etc/nginx/mime.types;
- default_type application/octet-stream;
- server {
- listen ${services[doc]} default_server;
- listen [::]:${services[doc]} default_server;
- root /usr/src/arvados/doc/.site;
- index index.html;
- server_name _;
- }
-
- upstream controller {
- server localhost:${services[controller]};
- }
- server {
- listen *:${services[controller-ssl]} ssl default_server;
- server_name controller;
- ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
- ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
- location / {
- proxy_pass http://controller;
- proxy_set_header Host \$http_host;
- proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto https;
- proxy_redirect off;
- }
- }
-
-upstream arvados-ws {
- server localhost:${services[websockets]};
-}
-server {
- listen *:${services[websockets-ssl]} ssl default_server;
- server_name websockets;
-
- proxy_connect_timeout 90s;
- proxy_read_timeout 300s;
-
- ssl on;
- ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
- ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
-
- location / {
- proxy_pass http://arvados-ws;
- proxy_set_header Upgrade \$http_upgrade;
- proxy_set_header Connection "upgrade";
- proxy_set_header Host \$http_host;
- proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
- }
-}
-
- upstream workbench2 {
- server localhost:${services[workbench2]};
- }
- server {
- listen *:${services[workbench2-ssl]} ssl default_server;
- server_name workbench2;
- ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
- ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
- location / {
- proxy_pass http://workbench2;
- proxy_set_header Host \$http_host;
- proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto https;
- proxy_redirect off;
- }
- location /sockjs-node {
- proxy_pass http://workbench2;
- proxy_set_header Upgrade \$http_upgrade;
- proxy_set_header Connection "upgrade";
- proxy_set_header Host \$http_host;
- proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
- }
- }
-
- upstream keep-web {
- server localhost:${services[keep-web]};
- }
- server {
- listen *:${services[keep-web-ssl]} ssl default_server;
- server_name keep-web;
- ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
- ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
- location / {
- proxy_pass http://keep-web;
- proxy_set_header Host \$http_host;
- proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto https;
- proxy_redirect off;
- }
- }
-
-}
-
-EOF
-
-exec nginx -c /var/lib/arvados/nginx.conf
import argparse
import gzip
+from io import open
import logging
import sys
help='Log more information (once for progress, twice for debug)')
+class UTF8Decode(object):
+ '''Wrap a file-like iterable to decode UTF-8 bytes into a strings
+ '''
+ def __init__(self, fh):
+ self.fh = fh
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return next(self.fh).decode('utf-8')
+
+ next = __next__
+
+ def close(self):
+ # mimic Gzip behavior and don't close underlying object
+ pass
+
+
class Command(object):
def __init__(self, args):
self.args = args
self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
elif self.args.log_file:
if self.args.log_file.endswith('.gz'):
- fh = gzip.open(self.args.log_file)
+ fh = UTF8Decode(gzip.open(self.args.log_file))
else:
- fh = open(self.args.log_file)
+ fh = open(self.args.log_file, mode = 'r', encoding = 'utf-8')
self.summer = summarizer.Summarizer(fh, **kwargs)
else:
self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
'data': self._collate_data(tasks, stat),
'options': {
'connectSeparatedPoints': True,
- 'labels': ['elapsed']+[uuid for uuid, _ in tasks.iteritems()],
+ 'labels': ['elapsed']+[uuid for uuid, _ in tasks.items()],
'title': '{}: {} {}'.format(label, stat[0], stat[1]),
},
}
def _collate_data(self, tasks, stat):
data = []
nulls = []
- for uuid, task in tasks.iteritems():
+ for uuid, task in tasks.items():
for pt in task.series[stat]:
data.append([pt[0].total_seconds()] + nulls + [pt[1]])
nulls.append(None)
#
# SPDX-License-Identifier: AGPL-3.0
-from __future__ import print_function
-
import arvados
import itertools
-import Queue
+import queue
import threading
from crunchstat_summary import logger
self._queue.put(self.EOF)
def __iter__(self):
- self._queue = Queue.Queue()
+ self._queue = queue.Queue()
self._thread = threading.Thread(target=self._get_all_pages)
self._thread.daemon = True
self._thread.start()
return self
- def next(self):
+ def __next__(self):
line = self._queue.get()
if line is self.EOF:
self._thread.join()
raise StopIteration
return line
+ next = __next__ # for Python 2
+
def __enter__(self):
return self
#
# SPDX-License-Identifier: AGPL-3.0
-from __future__ import print_function
-
import arvados
import collections
import crunchstat_summary.dygraphs
# number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
# that have amounts like 7.5 GiB according to the kernel.)
AVAILABLE_RAM_RATIO = 0.95
-
+MB=2**20
# Workaround datetime.datetime.strptime() thread-safety bug by calling
# it once before starting threads. https://bugs.python.org/issue7980
stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
if 'tx' in stats or 'rx' in stats:
stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
- for stat, val in stats.iteritems():
+ for stat, val in stats.items():
if group == 'interval':
if stat == 'seconds':
this_interval_s = val
self.job_tot = collections.defaultdict(
functools.partial(collections.defaultdict, int))
- for task_id, task_stat in self.task_stats.iteritems():
- for category, stat_last in task_stat.iteritems():
- for stat, val in stat_last.iteritems():
+ for task_id, task_stat in self.task_stats.items():
+ for category, stat_last in task_stat.items():
+ for stat, val in stat_last.items():
if stat in ['cpus', 'cache', 'swap', 'rss']:
# meaningless stats like 16 cpu cores x 5 tasks = 80
continue
def _text_report_gen(self):
yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
- for category, stat_max in sorted(self.stats_max.iteritems()):
- for stat, val in sorted(stat_max.iteritems()):
+ for category, stat_max in sorted(self.stats_max.items()):
+ for stat, val in sorted(stat_max.items()):
if stat.endswith('__rate'):
continue
max_rate = self._format(stat_max.get(stat+'__rate', '-'))
self.stats_max['cpu']['user+sys__rate'],
lambda x: x * 100),
('Overall CPU usage: {}%',
- self.job_tot['cpu']['user+sys'] /
+ float(self.job_tot['cpu']['user+sys']) /
self.job_tot['time']['elapsed']
if self.job_tot['time']['elapsed'] > 0 else 0,
lambda x: x * 100),
yield "# "+format_string.format(self._format(val))
def _recommend_gen(self):
+ # TODO recommend fixing job granularity if elapsed time is too short
return itertools.chain(
self._recommend_cpu(),
self._recommend_ram(),
constraint_key = self._map_runtime_constraint('vcpus')
cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
- if cpu_max_rate == float('-Inf'):
+ if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
logger.warning('%s: no CPU usage data', self.label)
return
+ # TODO Don't necessarily want to recommend on isolated max peak
+ # take average CPU usage into account as well or % time at max
used_cores = max(1, int(math.ceil(cpu_max_rate)))
asked_cores = self.existing_constraints.get(constraint_key)
- if asked_cores is None or used_cores < asked_cores:
+ if asked_cores is None:
+ asked_cores = 1
+ # TODO: This should be more nuanced in cases where max >> avg
+ if used_cores < asked_cores:
yield (
'#!! {} max CPU usage was {}% -- '
- 'try runtime_constraints "{}":{}'
+ 'try reducing runtime_constraints to "{}":{}'
).format(
self.label,
- int(math.ceil(cpu_max_rate*100)),
+ math.ceil(cpu_max_rate*100),
constraint_key,
int(used_cores))
+ # FIXME: This needs to be updated to account for current nodemanager algorithms
def _recommend_ram(self):
"""Recommend an economical RAM constraint for this job.
if used_bytes == float('-Inf'):
logger.warning('%s: no memory usage data', self.label)
return
- used_mib = math.ceil(float(used_bytes) / 1048576)
+ used_mib = math.ceil(float(used_bytes) / MB)
asked_mib = self.existing_constraints.get(constraint_key)
nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
- if asked_mib is None or (
- math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
+ if used_mib > 0 and (asked_mib is None or (
+ math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
yield (
'#!! {} max RSS was {} MiB -- '
- 'try runtime_constraints "{}":{}'
+ 'try reducing runtime_constraints to "{}":{}'
).format(
self.label,
int(used_mib),
constraint_key,
- int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
+ int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
def _recommend_keep_cache(self):
"""Recommend increasing keep cache if utilization < 80%"""
return
utilization = (float(self.job_tot['blkio:0:0']['read']) /
float(self.job_tot['net:keep0']['rx']))
- asked_mib = self.existing_constraints.get(constraint_key, 256)
+ # FIXME: the default on this get won't work correctly
+ asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
if utilization < 0.8:
yield (
'#!! {} Keep cache utilization was {:.2f}% -- '
- 'try runtime_constraints "{}":{} (or more)'
+ 'try doubling runtime_constraints to "{}":{} (or more)'
).format(
self.label,
utilization * 100.0,
constraint_key,
- asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
+ math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
def _format(self, val):
class JobSummarizer(ProcessSummarizer):
- runtime_constraint_mem_unit = 1048576
+ runtime_constraint_mem_unit = MB
map_runtime_constraint = {
'keep_cache_ram': 'keep_cache_mb_per_task',
'ram': 'min_ram_mb_per_node',
def run(self):
threads = []
- for child in self.children.itervalues():
+ for child in self.children.values():
self.throttle.acquire()
t = threading.Thread(target=self.run_and_release, args=(child.run, ))
t.daemon = True
def text_report(self):
txt = ''
d = self._descendants()
- for child in d.itervalues():
+ for child in d.values():
if len(d) > 1:
txt += '### Summary for {} ({})\n'.format(
child.label, child.process['uuid'])
MultiSummarizers) are omitted.
"""
d = collections.OrderedDict()
- for key, child in self.children.iteritems():
+ for key, child in self.children.items():
if isinstance(child, Summarizer):
d[key] = child
if isinstance(child, MultiSummarizer):
return d
def html_report(self):
- return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html()
+ return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
class JobTreeSummarizer(MultiSummarizer):
preloaded = {}
for j in arv.jobs().index(
limit=len(job['components']),
- filters=[['uuid','in',job['components'].values()]]).execute()['items']:
+ filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
preloaded[j['uuid']] = j
for cname in sorted(job['components'].keys()):
child_uuid = job['components'][cname]
class PipelineSummarizer(MultiSummarizer):
def __init__(self, instance, **kwargs):
children = collections.OrderedDict()
- for cname, component in instance['components'].iteritems():
+ for cname, component in instance['components'].items():
if 'job' not in component:
logger.warning(
"%s: skipping component with no job assigned", cname)
cr['name'] = cr.get('name') or cr['uuid']
todo.append(cr)
sorted_children = collections.OrderedDict()
- for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
+ for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
sorted_children[uuid] = children[uuid]
super(ContainerTreeSummarizer, self).__init__(
children=sorted_children,
#
# SPDX-License-Identifier: AGPL-3.0
-import cgi
+try:
+ from html import escape
+except ImportError:
+ from cgi import escape
+
import json
import pkg_resources
<script type="text/javascript">{}</script>
{}
</head><body></body></html>
- '''.format(cgi.escape(self.label),
+ '''.format(escape(self.label),
self.JSLIB, self.js(), self.headHTML())
def js(self):
return 'var chartdata = {};\n{}'.format(
json.dumps(self.sections()),
- '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa) for jsa in self.JSASSETS]))
+ '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa).decode('utf-8') for jsa in self.JSASSETS]))
def sections(self):
return [
# Number of tasks: 1
# Max CPU time spent by a single task: 0s
# Max CPU usage in a single interval: 0%
-# Overall CPU usage: 0%
+# Overall CPU usage: 0.00%
# Max memory used by a single task: 0.00GB
# Max network traffic in a single task: 0.00GB
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! container max CPU usage was 0% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 0 MiB -- try runtime_constraints "ram":0
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
+#!! container max RSS was 67 MiB -- try reducing runtime_constraints to "ram":1020054732
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
+#!! container max RSS was 67 MiB -- try reducing runtime_constraints to "ram":1020054732
# Max network speed in a single interval: 42.58MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-jq0ekny1xou3zoh max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-zvb2ocfycpomrup max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
# Max network speed in a single interval: 0.00MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-v831jm2uq0g2g9x max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
import difflib
import glob
import gzip
+from io import open
import mock
import os
+import sys
import unittest
+from crunchstat_summary.command import UTF8Decode
+
TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
class ReportDiff(unittest.TestCase):
def diff_known_report(self, logfile, cmd):
expectfile = logfile+'.report'
- expect = open(expectfile).readlines()
+ with open(expectfile, encoding='utf-8') as f:
+ expect = f.readlines()
self.diff_report(cmd, expect, expectfile=expectfile)
- def diff_report(self, cmd, expect, expectfile=None):
+ def diff_report(self, cmd, expect, expectfile='(expected)'):
got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
expect, got, fromfile=expectfile, tofile="(generated)")))
['--format=html', '--log-file', logfile])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
- self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
+ if sys.version_info >= (3,2):
+ self.assertRegex(cmd.report(), r'(?is)<html>.*</html>\s*$')
+ else:
+ self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
class SummarizeEdgeCases(unittest.TestCase):
def test_error_messages(self):
- logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'))
+ logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'), encoding='utf-8')
s = crunchstat_summary.summarizer.Summarizer(logfile)
s.run()
'container.json', 'crunchstat.txt', 'arv-mount.txt']
def _open(n):
if n == "crunchstat.txt":
- return gzip.open(self.logfile)
+ return UTF8Decode(gzip.open(self.logfile))
elif n == "arv-mount.txt":
- return gzip.open(self.arvmountlog)
+ return UTF8Decode(gzip.open(self.arvmountlog))
mock_cr().open.side_effect = _open
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_request['uuid']])
def test_job_report(self, mock_api, mock_cr):
mock_api().jobs().get().execute.return_value = self.fake_job
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.return_value = gzip.open(self.logfile)
+ mock_cr().open.return_value = UTF8Decode(gzip.open(self.logfile))
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_job_uuid])
cmd = crunchstat_summary.command.Command(args)
mock_api().pipeline_instances().get().execute. \
return_value = self.fake_instance
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--pipeline-instance', self.fake_instance['uuid']])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
- job_report = [
- line for line in open(logfile+'.report').readlines()
- if not line.startswith('#!! ')]
+ with open(logfile+'.report', encoding='utf-8') as f:
+ job_report = [line for line in f if not line.startswith('#!! ')]
expect = (
['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
job_report + ['\n'] +
mock_api().jobs().index().execute.return_value = self.fake_jobs_index
mock_api().jobs().get().execute.return_value = self.fake_job
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_job['uuid']])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
- job_report = [
- line for line in open(logfile+'.report').readlines()
- if not line.startswith('#!! ')]
+ with open(logfile+'.report', encoding='utf-8') as f:
+ job_report = [line for line in f if not line.startswith('#!! ')]
expect = (
['### Summary for zzzzz-8i9sb-i3e77t9z5y8j9cc (partial) (zzzzz-8i9sb-i3e77t9z5y8j9cc)\n',
'(no report generated)\n',