Merge branch '14873-google-api-client-update'
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 20 Mar 2019 20:17:07 +0000 (17:17 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 20 Mar 2019 20:17:07 +0000 (17:17 -0300)
Refs #14873

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

14 files changed:
build/run-library.sh
build/run-tests.sh
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/scheduler/interfaces.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/sync.go
lib/dispatchcloud/test/queue.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/runner.go [new file with mode: 0644]
lib/dispatchcloud/worker/worker.go
lib/dispatchcloud/worker/worker_test.go
sdk/go/arvados/config.go
services/crunch-run/background.go

index de9d67d41cbe3f94a6bb9ca1c9ac1a819051dc35..1daceff2393537485d0dd51f381648076f43d512 100755 (executable)
@@ -826,14 +826,13 @@ install_package() {
   fi
 }
 
-title () {
-    txt="********** $1 **********"
-    printf "\n%*s%s\n\n" $((($COLUMNS-${#txt})/2)) "" "$txt"
+title() {
+    printf '%s %s\n' "=======" "$1"
 }
 
 checkexit() {
     if [[ "$1" != "0" ]]; then
-        title "!!!!!! $2 FAILED !!!!!!"
+        title "$2 -- FAILED"
         failures+=("$2 (`timer`)")
     else
         successes+=("$2 (`timer`)")
@@ -856,7 +855,9 @@ report_outcomes() {
 
     if [[ ${#failures[@]} == 0 ]]
     then
-        echo "All test suites passed."
+        if [[ ${#successes[@]} != 0 ]]; then
+           echo "All test suites passed."
+        fi
     else
         echo "Failures (${#failures[@]}):"
         for x in "${failures[@]}"
index 095d32eaae39de11bc031619d121bbe16562c2ed..d4afb52fa34382a33bca323ebbdf26b56e89efd4 100755 (executable)
@@ -19,6 +19,10 @@ Syntax:
 Options:
 
 --skip FOO     Do not test the FOO component.
+--skip sanity  Skip initial dev environment sanity checks.
+--skip install Do not run any install steps. Just run tests.
+               You should provide GOPATH, GEMHOME, and VENVDIR options
+               from a previous invocation if you use this option.
 --only FOO     Do not test anything except the FOO component.
 --temp DIR     Install components and dependencies under DIR instead of
                making a new temporary directory. Implies --leave-temp.
@@ -27,11 +31,9 @@ Options:
                subsequent invocations.
 --repeat N     Repeat each install/test step until it succeeds N times.
 --retry        Prompt to retry if an install or test suite fails.
---skip-install Do not run any install steps. Just run tests.
-               You should provide GOPATH, GEMHOME, and VENVDIR options
-               from a previous invocation if you use this option.
 --only-install Run specific install step
 --short        Skip (or scale down) some slow tests.
+--interactive  Set up, then prompt for test/install steps to perform.
 WORKSPACE=path Arvados source tree to test.
 CONFIGSRC=path Dir with api server config files to copy into source tree.
                (If none given, leave config files alone in source tree.)
@@ -49,7 +51,7 @@ ARVADOS_DEBUG=1
 envvar=value   Set \$envvar to value. Primarily useful for WORKSPACE,
                *_test, and other examples shown above.
 
-Assuming --skip-install is not given, all components are installed
+Assuming "--skip install" is not given, all components are installed
 into \$GOPATH, \$VENDIR, and \$GEMHOME before running any tests. Many
 test suites depend on other components being installed, and installing
 everything tends to be quicker than debugging dependencies.
@@ -169,7 +171,9 @@ fatal() {
 
 exit_cleanly() {
     trap - INT
-    create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+    if which create-plot-data-from-log.sh >/dev/null; then
+        create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+    fi
     rotate_logfile "$WORKSPACE/apps/workbench/log/" "test.log"
     stop_services
     rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
@@ -179,6 +183,7 @@ exit_cleanly() {
 }
 
 sanity_checks() {
+    [[ -n "${skip[sanity]}" ]] && return 0
     ( [[ -n "$WORKSPACE" ]] && [[ -d "$WORKSPACE/services" ]] ) \
         || fatal "WORKSPACE environment variable not set to a source directory (see: $0 --help)"
     echo Checking dependencies:
@@ -303,8 +308,11 @@ do
         --short)
             short=1
             ;;
+        --interactive)
+            interactive=1
+            ;;
         --skip-install)
-            only_install=nothing
+            skip[install]=1
             ;;
         --only-install)
             only_install="$1"; shift
@@ -355,6 +363,10 @@ if [[ $NEED_SDK_R == false ]]; then
 fi
 
 start_services() {
+    if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
+        return 0
+    fi
+    . "$VENVDIR/bin/activate"
     echo 'Starting API, keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
     if [[ ! -d "$WORKSPACE/services/api/log" ]]; then
        mkdir -p "$WORKSPACE/services/api/log"
@@ -363,8 +375,10 @@ start_services() {
     if [[ -f "$WORKSPACE/tmp/api.pid" && ! -s "$WORKSPACE/tmp/api.pid" ]]; then
        rm -f "$WORKSPACE/tmp/api.pid"
     fi
+    all_services_stopped=
+    fail=0
     cd "$WORKSPACE" \
-        && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo fail=1) \
+        && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo "fail=1; false") \
         && export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
         && export ARVADOS_TEST_API_INSTALLED="$$" \
         && python sdk/python/tests/run_test_server.py start_controller \
@@ -372,18 +386,22 @@ start_services() {
         && python sdk/python/tests/run_test_server.py start_keep-web \
         && python sdk/python/tests/run_test_server.py start_arv-git-httpd \
         && python sdk/python/tests/run_test_server.py start_ws \
-        && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo fail=1) \
-        && (env | egrep ^ARVADOS)
-    if [[ -n "$fail" ]]; then
-       return 1
+        && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo "fail=1; false") \
+        && (env | egrep ^ARVADOS) \
+        || fail=1
+    deactivate
+    if [[ $fail != 0 ]]; then
+        unset ARVADOS_TEST_API_HOST
     fi
+    return $fail
 }
 
 stop_services() {
-    if [[ -z "$ARVADOS_TEST_API_HOST" ]]; then
+    if [[ -n "$all_services_stopped" ]]; then
         return
     fi
     unset ARVADOS_TEST_API_HOST
+    . "$VENVDIR/bin/activate" || return
     cd "$WORKSPACE" \
         && python sdk/python/tests/run_test_server.py stop_nginx \
         && python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
@@ -391,7 +409,9 @@ stop_services() {
         && python sdk/python/tests/run_test_server.py stop_keep-web \
         && python sdk/python/tests/run_test_server.py stop_keep_proxy \
         && python sdk/python/tests/run_test_server.py stop_controller \
-        && python sdk/python/tests/run_test_server.py stop
+        && python sdk/python/tests/run_test_server.py stop \
+        && all_services_stopped=1
+    deactivate
 }
 
 interrupt() {
@@ -400,36 +420,6 @@ interrupt() {
 }
 trap interrupt INT
 
-sanity_checks
-
-echo "WORKSPACE=$WORKSPACE"
-
-if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
-    # Jenkins expects us to use this by default.
-    CONFIGSRC="$HOME/arvados-api-server"
-fi
-
-# Clean up .pyc files that may exist in the workspace
-cd "$WORKSPACE"
-find -name '*.pyc' -delete
-
-if [[ -z "$temp" ]]; then
-    temp="$(mktemp -d)"
-fi
-
-# Set up temporary install dirs (unless existing dirs were supplied)
-for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
-do
-    if [[ -z "${!tmpdir}" ]]; then
-        eval "$tmpdir"="$temp/$tmpdir"
-    fi
-    if ! [[ -d "${!tmpdir}" ]]; then
-        mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
-    fi
-done
-
-rm -vf "${WORKSPACE}/tmp/*.log"
-
 setup_ruby_environment() {
     if [[ -s "$HOME/.rvm/scripts/rvm" ]] ; then
         source "$HOME/.rvm/scripts/rvm"
@@ -528,114 +518,136 @@ setup_virtualenv() {
     "$venvdest/bin/pip" install --no-cache-dir 'mock>=1.0' 'pbr<1.7.0'
 }
 
-export PERLINSTALLBASE
-export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
+initialize() {
+    sanity_checks
 
-export R_LIBS
+    echo "WORKSPACE=$WORKSPACE"
 
-export GOPATH
-(
-    set -e
-    mkdir -p "$GOPATH/src/git.curoverse.com"
-    if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
-        for d in \
-            "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
-                "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
-                "$GOPATH/src/git.curoverse.com/arvados.git"; do
-            [[ -d "$d" ]] && rmdir "$d"
-        done
+    if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
+        # Jenkins expects us to use this by default.
+        CONFIGSRC="$HOME/arvados-api-server"
     fi
-    for d in \
-        "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
-        "$GOPATH/src/git.curoverse.com/arvados.git"; do
-        [[ -h "$d" ]] && rm "$d"
-    done
-    ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
-    go get -v github.com/kardianos/govendor
-    cd "$GOPATH/src/git.curoverse.com/arvados.git"
-    if [[ -n "$short" ]]; then
-        go get -v -d ...
-        "$GOPATH/bin/govendor" sync
-    else
-        # Remove cached source dirs in workdir. Otherwise, they will
-        # not qualify as +missing or +external below, and we won't be
-        # able to detect that they're missing from vendor/vendor.json.
-        rm -rf vendor/*/
-        go get -v -d ...
-        "$GOPATH/bin/govendor" sync
-        [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
-            || fatal "vendor/vendor.json has unused or missing dependencies -- try:
 
-(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
+    # Clean up .pyc files that may exist in the workspace
+    cd "$WORKSPACE"
+    find -name '*.pyc' -delete
 
-";
+    if [[ -z "$temp" ]]; then
+        temp="$(mktemp -d)"
     fi
-) || fatal "Go setup failed"
 
-setup_virtualenv "$VENVDIR" --python python2.7
-. "$VENVDIR/bin/activate"
+    # Set up temporary install dirs (unless existing dirs were supplied)
+    for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
+    do
+        if [[ -z "${!tmpdir}" ]]; then
+            eval "$tmpdir"="$temp/$tmpdir"
+        fi
+        if ! [[ -d "${!tmpdir}" ]]; then
+            mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
+        fi
+    done
 
-# Needed for run_test_server.py which is used by certain (non-Python) tests.
-pip install --no-cache-dir PyYAML \
-    || fatal "pip install PyYAML failed"
+    rm -vf "${WORKSPACE}/tmp/*.log"
 
-# Preinstall libcloud if using a fork; otherwise nodemanager "pip
-# install" won't pick it up by default.
-if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
-    pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
-        || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
-        || fatal "pip install apache-libcloud failed"
-fi
+    export PERLINSTALLBASE
+    export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
 
-# Deactivate Python 2 virtualenv
-deactivate
+    export R_LIBS
 
-declare -a pythonstuff
-pythonstuff=(
-    sdk/pam
-    sdk/python
-    sdk/python:py3
-    sdk/cwl
-    sdk/cwl:py3
-    services/dockercleaner:py3
-    services/fuse
-    services/nodemanager
-    tools/crunchstat-summary
-    )
+    export GOPATH
 
-# If Python 3 is available, set up its virtualenv in $VENV3DIR.
-# Otherwise, skip dependent tests.
-PYTHON3=$(which python3)
-if [[ ${?} = 0 ]]; then
-    setup_virtualenv "$VENV3DIR" --python python3
-else
-    PYTHON3=
-    cat >&2 <<EOF
+    # Jenkins config requires that glob tmp/*.log match something. Ensure
+    # that happens even if we don't end up running services that set up
+    # logging.
+    mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
+    touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
 
-Warning: python3 could not be found. Python 3 tests will be skipped.
+    unset http_proxy https_proxy no_proxy
 
-EOF
-fi
 
-# Reactivate Python 2 virtualenv
-. "$VENVDIR/bin/activate"
+    # Note: this must be the last time we change PATH, otherwise rvm will
+    # whine a lot.
+    setup_ruby_environment
+
+    echo "PATH is $PATH"
+}
+
+install_env() {
+    (
+        set -e
+        mkdir -p "$GOPATH/src/git.curoverse.com"
+        if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
+            for d in \
+                "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
+                    "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
+                    "$GOPATH/src/git.curoverse.com/arvados.git"; do
+                [[ -d "$d" ]] && rmdir "$d"
+            done
+        fi
+        for d in \
+            "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
+                "$GOPATH/src/git.curoverse.com/arvados.git"; do
+            [[ -h "$d" ]] && rm "$d"
+        done
+        ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+        go get -v github.com/kardianos/govendor
+        cd "$GOPATH/src/git.curoverse.com/arvados.git"
+        if [[ -n "$short" ]]; then
+            go get -v -d ...
+            "$GOPATH/bin/govendor" sync
+        else
+            # Remove cached source dirs in workdir. Otherwise, they will
+            # not qualify as +missing or +external below, and we won't be
+            # able to detect that they're missing from vendor/vendor.json.
+            rm -rf vendor/*/
+            go get -v -d ...
+            "$GOPATH/bin/govendor" sync
+            [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
+                || fatal "vendor/vendor.json has unused or missing dependencies -- try:
+
+(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
 
-# Note: this must be the last time we change PATH, otherwise rvm will
-# whine a lot.
-setup_ruby_environment
+";
+        fi
+    ) || fatal "Go setup failed"
 
-echo "PATH is $PATH"
+    setup_virtualenv "$VENVDIR" --python python2.7
+    . "$VENVDIR/bin/activate"
 
-if ! which bundler >/dev/null
-then
-    gem install --user-install bundler || fatal 'Could not install bundler'
-fi
+    # Needed for run_test_server.py which is used by certain (non-Python) tests.
+    pip install --no-cache-dir PyYAML \
+        || fatal "pip install PyYAML failed"
+
+    # Preinstall libcloud if using a fork; otherwise nodemanager "pip
+    # install" won't pick it up by default.
+    if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
+        pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
+            || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
+            || fatal "pip install apache-libcloud failed"
+    fi
 
-# Jenkins config requires that glob tmp/*.log match something. Ensure
-# that happens even if we don't end up running services that set up
-# logging.
-mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
-touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
+    # Deactivate Python 2 virtualenv
+    deactivate
+
+    # If Python 3 is available, set up its virtualenv in $VENV3DIR.
+    # Otherwise, skip dependent tests.
+    PYTHON3=$(which python3)
+    if [[ ${?} = 0 ]]; then
+        setup_virtualenv "$VENV3DIR" --python python3
+    else
+        PYTHON3=
+        cat >&2 <<EOF
+
+Warning: python3 could not be found. Python 3 tests will be skipped.
+
+EOF
+    fi
+
+    if ! which bundler >/dev/null
+    then
+        gem install --user-install bundler || fatal 'Could not install bundler'
+    fi
+}
 
 retry() {
     remain="${repeat}"
@@ -644,7 +656,7 @@ retry() {
         if ${@}; then
             if [[ "$remain" -gt 1 ]]; then
                 remain=$((${remain}-1))
-                title "Repeating ${remain} more times"
+                title "(repeating ${remain} more times)"
             else
                 break
             fi
@@ -672,22 +684,39 @@ do_test() {
             suite="${1}"
             ;;
     esac
-    if [[ -z "${skip[$suite]}" && -z "${skip[$1]}" && \
-              (${#only[@]} -eq 0 || ${only[$suite]} -eq 1 || \
-                   ${only[$1]} -eq 1) ||
-                  ${only[$2]} -eq 1 ]]; then
-        retry do_test_once ${@}
-    else
-        title "Skipping ${1} tests"
+    if [[ -n "${skip[$suite]}" || \
+              -n "${skip[$1]}" || \
+              (${#only[@]} -ne 0 && ${only[$suite]} -eq 0 && ${only[$1]} -eq 0) ]]; then
+        return 0
     fi
+    case "${1}" in
+        services/api)
+            stop_services
+            ;;
+        doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
+            # don't care whether services are running
+            ;;
+        *)
+            if ! start_services; then
+                title "test $1 -- failed to start services"
+                return 1
+            fi
+            ;;
+    esac
+    retry do_test_once ${@}
 }
 
 do_test_once() {
     unset result
 
-    title "Running $1 tests"
+    title "test $1"
     timer_reset
-    if [[ "$2" == "go" ]]
+
+    if which deactivate >/dev/null; then deactivate; fi
+    if ! . "$VENVDIR/bin/activate"
+    then
+        result=1
+    elif [[ "$2" == "go" ]]
     then
         covername="coverage-$(echo "$1" | sed -e 's/\//_/g')"
         coverflags=("-covermode=count" "-coverprofile=$WORKSPACE/tmp/.$covername.tmp")
@@ -740,28 +769,25 @@ do_test_once() {
     fi
     result=${result:-$?}
     checkexit $result "$1 tests"
-    title "End of $1 tests (`timer`)"
+    title "test $1 -- `timer`"
     return $result
 }
 
 do_install() {
-  skipit=false
-
-  if [[ -z "${only_install}" || "${only_install}" == "${1}" || "${only_install}" == "${2}" ]]; then
-      retry do_install_once ${@}
-  else
-      skipit=true
-  fi
-
-  if [[ "$skipit" = true ]]; then
-    title "Skipping $1 install"
-  fi
+    if [[ -n "${skip[install]}" || ( -n "${only_install}" && "${only_install}" != "${1}" && "${only_install}" != "${2}" ) ]]; then
+        return 0
+    fi
+    retry do_install_once ${@}
 }
 
 do_install_once() {
-    title "Running $1 install"
+    title "install $1"
     timer_reset
-    if [[ "$2" == "go" ]]
+
+    if which deactivate >/dev/null; then deactivate; fi
+    if [[ "$1" != "env" ]] && ! . "$VENVDIR/bin/activate"; then
+        result=1
+    elif [[ "$2" == "go" ]]
     then
         go get -ldflags "-X main.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1"
     elif [[ "$2" == "pip" ]]
@@ -789,9 +815,9 @@ do_install_once() {
     else
         "install_$1"
     fi
-    result=$?
+    result=${result:-$?}
     checkexit $result "$1 install"
-    title "End of $1 install (`timer`)"
+    title "install $1 -- `timer`"
     return $result
 }
 
@@ -812,7 +838,6 @@ install_doc() {
         && bundle_install_trylocal \
         && rm -rf .site
 }
-do_install doc
 
 install_gem() {
     gemname=$1
@@ -824,56 +849,32 @@ install_gem() {
         && with_test_gemset gem install --no-ri --no-rdoc $(ls -t "$gemname"-*.gem|head -n1)
 }
 
-install_ruby_sdk() {
+install_sdk/ruby() {
     install_gem arvados sdk/ruby
 }
-do_install sdk/ruby ruby_sdk
 
-install_R_sdk() {
+install_sdk/R() {
   if [[ "$NEED_SDK_R" = true ]]; then
     cd "$WORKSPACE/sdk/R" \
        && Rscript --vanilla install_deps.R
   fi
 }
-do_install sdk/R R_sdk
 
-install_perl_sdk() {
+install_sdk/perl() {
     cd "$WORKSPACE/sdk/perl" \
         && perl Makefile.PL INSTALL_BASE="$PERLINSTALLBASE" \
         && make install INSTALLDIRS=perl
 }
-do_install sdk/perl perl_sdk
 
-install_cli() {
+install_sdk/cli() {
     install_gem arvados-cli sdk/cli
 }
-do_install sdk/cli cli
 
-install_login-sync() {
+install_services/login-sync() {
     install_gem arvados-login-sync services/login-sync
 }
-do_install services/login-sync login-sync
-
-# Install the Python SDK early. Various other test suites (like
-# keepproxy) bring up run_test_server.py, which imports the arvados
-# module. We can't actually *test* the Python SDK yet though, because
-# its own test suite brings up some of those other programs (like
-# keepproxy).
-for p in "${pythonstuff[@]}"
-do
-    dir=${p%:py3}
-    if [[ ${dir} = ${p} ]]; then
-        if [[ -z ${skip[python2]} ]]; then
-            do_install ${dir} pip
-        fi
-    elif [[ -n ${PYTHON3} ]]; then
-        if [[ -z ${skip[python3]} ]]; then
-            do_install ${dir} pip "$VENV3DIR/bin/"
-        fi
-    fi
-done
 
-install_apiserver() {
+install_services/api() {
     cd "$WORKSPACE/services/api" \
         && RAILS_ENV=test bundle_install_trylocal
 
@@ -921,7 +922,19 @@ install_apiserver() {
         && RAILS_ENV=test bundle exec rake db:setup \
         && RAILS_ENV=test bundle exec rake db:fixtures:load
 }
-do_install services/api apiserver
+
+declare -a pythonstuff
+pythonstuff=(
+    sdk/pam
+    sdk/python
+    sdk/python:py3
+    sdk/cwl
+    sdk/cwl:py3
+    services/dockercleaner:py3
+    services/fuse
+    services/nodemanager
+    tools/crunchstat-summary
+)
 
 declare -a gostuff
 gostuff=(
@@ -967,22 +980,15 @@ gostuff=(
     tools/keep-rsync
     tools/sync-groups
 )
-for g in "${gostuff[@]}"
-do
-    do_install "$g" go
-done
 
-install_workbench() {
+install_apps/workbench() {
     cd "$WORKSPACE/apps/workbench" \
         && mkdir -p tmp/cache \
         && RAILS_ENV=test bundle_install_trylocal \
         && RAILS_ENV=test RAILS_GROUPS=assets bundle exec rake npm:install
 }
-do_install apps/workbench workbench
 
-unset http_proxy https_proxy no_proxy
-
-test_doclinkchecker() {
+test_doc() {
     (
         set -e
         cd "$WORKSPACE/doc"
@@ -992,108 +998,239 @@ test_doclinkchecker() {
         PYTHONPATH=$WORKSPACE/sdk/python/ bundle exec rake linkchecker baseurl=file://$WORKSPACE/doc/.site/ arvados_workbench_host=https://workbench.$ARVADOS_API_HOST arvados_api_host=$ARVADOS_API_HOST
     )
 }
-do_test doc doclinkchecker
-
-stop_services
 
-test_apiserver() {
+test_services/api() {
     rm -f "$WORKSPACE/services/api/git-commit.version"
     cd "$WORKSPACE/services/api" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[services/api]}
 }
-do_test services/api apiserver
-
-# Shortcut for when we're only running apiserver tests. This saves a bit of time,
-# because we don't need to start up the api server for subsequent tests.
-if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
-  rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
-  exit_cleanly
-fi
-
-start_services || { stop_services; fatal "start_services"; }
 
-test_ruby_sdk() {
+test_sdk/ruby() {
     cd "$WORKSPACE/sdk/ruby" \
         && bundle exec rake test TESTOPTS=-v ${testargs[sdk/ruby]}
 }
-do_test sdk/ruby ruby_sdk
 
-test_R_sdk() {
+test_sdk/R() {
   if [[ "$NEED_SDK_R" = true ]]; then
     cd "$WORKSPACE/sdk/R" \
         && Rscript --vanilla run_test.R
   fi
 }
 
-do_test sdk/R R_sdk
-
-test_cli() {
+test_sdk/cli() {
     cd "$WORKSPACE/sdk/cli" \
         && mkdir -p /tmp/keep \
         && KEEP_LOCAL_STORE=/tmp/keep bundle exec rake test TESTOPTS=-v ${testargs[sdk/cli]}
 }
-do_test sdk/cli cli
 
-test_login-sync() {
+test_services/login-sync() {
     cd "$WORKSPACE/services/login-sync" \
         && bundle exec rake test TESTOPTS=-v ${testargs[services/login-sync]}
 }
-do_test services/login-sync login-sync
 
-test_nodemanager_integration() {
+test_services/nodemanager_integration() {
     cd "$WORKSPACE/services/nodemanager" \
         && tests/integration_test.py ${testargs[services/nodemanager_integration]}
 }
-do_test services/nodemanager_integration nodemanager_integration
 
-for p in "${pythonstuff[@]}"
-do
-    dir=${p%:py3}
-    if [[ ${dir} = ${p} ]]; then
-        if [[ -z ${skip[python2]} ]]; then
-            do_test ${dir} pip
-        fi
-    elif [[ -n ${PYTHON3} ]]; then
-        if [[ -z ${skip[python3]} ]]; then
-            do_test ${dir} pip "$VENV3DIR/bin/"
-        fi
-    fi
-done
-
-for g in "${gostuff[@]}"
-do
-    do_test "$g" go
-done
-
-test_workbench_units() {
+test_apps/workbench_units() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:units TESTOPTS=-v ${testargs[apps/workbench]}
 }
-do_test apps/workbench_units workbench_units
 
-test_workbench_functionals() {
+test_apps/workbench_functionals() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:functionals TESTOPTS=-v ${testargs[apps/workbench]}
 }
-do_test apps/workbench_functionals workbench_functionals
 
-test_workbench_integration() {
+test_apps/workbench_integration() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:integration TESTOPTS=-v ${testargs[apps/workbench]}
 }
-do_test apps/workbench_integration workbench_integration
-
 
-test_workbench_benchmark() {
+test_apps/workbench_benchmark() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]}
 }
-do_test apps/workbench_benchmark workbench_benchmark
 
-test_workbench_profile() {
+test_apps/workbench_profile() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:profile ${testargs[apps/workbench_profile]}
 }
-do_test apps/workbench_profile workbench_profile
 
+install_deps() {
+    # Install parts needed by test suites
+    do_install env
+    do_install cmd/arvados-server go
+    do_install sdk/cli
+    do_install sdk/perl
+    do_install sdk/python pip
+    do_install sdk/ruby
+    do_install services/api
+    do_install services/arv-git-httpd go
+    do_install services/keepproxy go
+    do_install services/keepstore go
+    do_install services/keep-web go
+    do_install services/ws go
+}
+
+install_all() {
+    do_install env
+    do_install doc
+    do_install sdk/ruby
+    do_install sdk/R
+    do_install sdk/perl
+    do_install sdk/cli
+    do_install services/login-sync
+    for p in "${pythonstuff[@]}"
+    do
+        dir=${p%:py3}
+        if [[ ${dir} = ${p} ]]; then
+            if [[ -z ${skip[python2]} ]]; then
+                do_install ${dir} pip
+            fi
+        elif [[ -n ${PYTHON3} ]]; then
+            if [[ -z ${skip[python3]} ]]; then
+                do_install ${dir} pip "$VENV3DIR/bin/"
+            fi
+        fi
+    done
+    do_install services/api
+    for g in "${gostuff[@]}"
+    do
+        do_install "$g" go
+    done
+    do_install apps/workbench
+}
+
+test_all() {
+    stop_services
+    do_test services/api
+
+    # Shortcut for when we're only running apiserver tests. This saves a bit of time,
+    # because we don't need to start up the api server for subsequent tests.
+    if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
+        rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
+        exit_cleanly
+    fi
+
+    do_test doc
+    do_test sdk/ruby
+    do_test sdk/R
+    do_test sdk/cli
+    do_test services/login-sync
+    do_test services/nodemanager_integration
+    for p in "${pythonstuff[@]}"
+    do
+        dir=${p%:py3}
+        if [[ ${dir} = ${p} ]]; then
+            if [[ -z ${skip[python2]} ]]; then
+                do_test ${dir} pip
+            fi
+        elif [[ -n ${PYTHON3} ]]; then
+            if [[ -z ${skip[python3]} ]]; then
+                do_test ${dir} pip "$VENV3DIR/bin/"
+            fi
+        fi
+    done
+
+    for g in "${gostuff[@]}"
+    do
+        do_test "$g" go
+    done
+    do_test apps/workbench_units
+    do_test apps/workbench_functionals
+    do_test apps/workbench_integration
+    do_test apps/workbench_benchmark
+    do_test apps/workbench_profile
+}
+
+help_interactive() {
+    echo "== Interactive commands:"
+    echo "TARGET                 (short for 'test DIR')"
+    echo "test TARGET"
+    echo "test TARGET:py3        (test with python3)"
+    echo "test TARGET -check.vv  (pass arguments to test)"
+    echo "install TARGET"
+    echo "install env            (go/python libs)"
+    echo "install deps           (go/python libs + arvados components needed for integration tests)"
+    echo "reset                  (...services used by integration tests)"
+    echo "exit"
+    echo "== Test targets:"
+    echo "${!testfuncargs[@]}" | tr ' ' '\n' | sort | column
+}
+
+initialize
+
+declare -A testfuncargs=()
+for g in "${gostuff[@]}"; do
+    testfuncargs[$g]="$g go"
+done
+for p in "${pythonstuff[@]}"; do
+    dir=${p%:py3}
+    if [[ ${dir} = ${p} ]]; then
+        testfuncargs[$p]="$dir pip $VENVDIR/bin/"
+    else
+        testfuncargs[$p]="$dir pip $VENV3DIR/bin/"
+    fi
+done
+
+if [[ -z ${interactive} ]]; then
+    install_all
+    test_all
+else
+    skip=()
+    only=()
+    only_install=()
+    if [[ -e "$VENVDIR/bin/activate" ]]; then stop_services; fi
+    setnextcmd() {
+        if [[ "$nextcmd" != "install deps" ]]; then
+            :
+        elif [[ -e "$VENVDIR/bin/activate" ]]; then
+            nextcmd="test lib/cmd"
+        else
+            nextcmd="install deps"
+        fi
+    }
+    echo
+    help_interactive
+    nextcmd="install deps"
+    setnextcmd
+    while read -p 'What next? ' -e -i "${nextcmd}" nextcmd; do
+        read verb target opts <<<"${nextcmd}"
+        case "${verb}" in
+            "" | "help")
+                help_interactive
+                ;;
+            "exit" | "quit")
+                exit_cleanly
+                ;;
+            "reset")
+                stop_services
+                ;;
+            *)
+                target="${target%/}"
+                testargs["$target"]="${opts}"
+                case "$target" in
+                    all | deps)
+                        ${verb}_${target}
+                        ;;
+                    *)
+                        tt="${testfuncargs[${target}]}"
+                        tt="${tt:-$target}"
+                        do_$verb $tt
+                        ;;
+                esac
+                ;;
+        esac
+        if [[ ${#successes[@]} -gt 0 || ${#failures[@]} -gt 0 ]]; then
+            report_outcomes
+            successes=()
+            failures=()
+        fi
+        cd "$WORKSPACE"
+        setnextcmd
+    done
+    echo
+fi
 exit_cleanly
index 36b06020748f43f5f4c7bbdefb5302935dedb861..7268f106a9f36ba933da51ecba4465ba760a8820 100644 (file)
@@ -62,6 +62,8 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        ProbeInterval:      arvados.Duration(5 * time.Millisecond),
                        StaleLockTimeout:   arvados.Duration(5 * time.Millisecond),
                        MaxProbesPerSecond: 1000,
+                       TimeoutSignal:      arvados.Duration(3 * time.Millisecond),
+                       TimeoutTERM:        arvados.Duration(20 * time.Millisecond),
                },
                InstanceTypes: arvados.InstanceTypeMap{
                        test.InstanceType(1).Name:  test.InstanceType(1),
@@ -124,17 +126,20 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        for _, ctr := range queue.Containers {
                waiting[ctr.UUID] = struct{}{}
        }
-       executeContainer := func(ctr arvados.Container) int {
+       finishContainer := func(ctr arvados.Container) {
                mtx.Lock()
                defer mtx.Unlock()
                if _, ok := waiting[ctr.UUID]; !ok {
-                       c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
-                       return 1
+                       c.Errorf("container completed twice: %s", ctr.UUID)
+                       return
                }
                delete(waiting, ctr.UUID)
                if len(waiting) == 0 {
                        close(done)
                }
+       }
+       executeContainer := func(ctr arvados.Container) int {
+               finishContainer(ctr)
                return int(rand.Uint32() & 0x3)
        }
        n := 0
@@ -144,6 +149,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
                stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
                stubvm.ExecuteContainer = executeContainer
+               stubvm.CrashRunningContainer = finishContainer
                switch n % 7 {
                case 0:
                        stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
@@ -261,7 +267,13 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
        c.Check(ok, check.Equals, true)
        <-ch
 
-       sr = getInstances()
+       for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); {
+               sr = getInstances()
+               if len(sr.Items) > 0 {
+                       break
+               }
+               time.Sleep(time.Millisecond)
+       }
        c.Assert(len(sr.Items), check.Equals, 1)
        c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
        c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
index 18cdc94fa52156ceab01d7dbe135d8db20029176..307807e32337257f14d02178cbd6e98de61f7be8 100644 (file)
@@ -38,7 +38,7 @@ type WorkerPool interface {
        Create(arvados.InstanceType) bool
        Shutdown(arvados.InstanceType) bool
        StartContainer(arvados.InstanceType, arvados.Container) bool
-       KillContainer(uuid string)
+       KillContainer(uuid, reason string)
        Subscribe() <-chan struct{}
        Unsubscribe(<-chan struct{})
 }
index 9f26877f552c0b6e35af4661deb859f7a5085503..dab324579dc84e9c8c93086fe693a638f029191f 100644 (file)
@@ -77,7 +77,7 @@ func (p *stubPool) Create(it arvados.InstanceType) bool {
        p.unalloc[it]++
        return true
 }
-func (p *stubPool) KillContainer(uuid string) {
+func (p *stubPool) KillContainer(uuid, reason string) {
        p.Lock()
        defer p.Unlock()
        delete(p.running, uuid)
index 28b9fd33857a388f6bd15aa1252a4797bec18fee..99bee484c6f7162a3e875b7627738a555fb46e13 100644 (file)
@@ -32,7 +32,7 @@ func (sch *Scheduler) sync() {
                        if !running {
                                go sch.cancel(uuid, "not running on any worker")
                        } else if !exited.IsZero() && qUpdated.After(exited) {
-                               go sch.cancel(uuid, "state=\"Running\" after crunch-run exited")
+                               go sch.cancel(uuid, "state=Running after crunch-run exited")
                        } else if ent.Container.Priority == 0 {
                                go sch.kill(uuid, "priority=0")
                        }
@@ -46,7 +46,7 @@ func (sch *Scheduler) sync() {
                                // of kill() will be to make the
                                // worker available for the next
                                // container.
-                               go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
                        } else {
                                sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
@@ -60,7 +60,7 @@ func (sch *Scheduler) sync() {
                                // a network outage and is still
                                // preparing to run a container that
                                // has already been unlocked/requeued.
-                               go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
                        }
                case arvados.ContainerStateLocked:
                        if running && !exited.IsZero() && qUpdated.After(exited) {
@@ -98,9 +98,7 @@ func (sch *Scheduler) cancel(uuid string, reason string) {
 }
 
 func (sch *Scheduler) kill(uuid string, reason string) {
-       logger := sch.logger.WithField("ContainerUUID", uuid)
-       logger.Debugf("killing crunch-run process because %s", reason)
-       sch.pool.KillContainer(uuid)
+       sch.pool.KillContainer(uuid, reason)
 }
 
 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
index e18a2b5362b413a16eeb28984bfabbe34796f6db..6a6c952358bcadff447d08869e641f32d24618ad 100644 (file)
@@ -158,14 +158,21 @@ func (q *Queue) Update() error {
 //
 // The resulting changes are not exposed through Get() or Entries()
 // until the next call to Update().
-func (q *Queue) Notify(upd arvados.Container) {
+//
+// Return value is true unless the update is rejected (invalid state
+// transition).
+func (q *Queue) Notify(upd arvados.Container) bool {
        q.mtx.Lock()
        defer q.mtx.Unlock()
        for i, ctr := range q.Containers {
                if ctr.UUID == upd.UUID {
-                       q.Containers[i] = upd
-                       return
+                       if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+                               q.Containers[i] = upd
+                               return true
+                       }
+                       return false
                }
        }
        q.Containers = append(q.Containers, upd)
+       return true
 }
index 4df39d0c46ac7dc538e5c7d9949cd884df8768b4..a4521eab7bb9074b02a2f06ef50b781971099d3d 100644 (file)
@@ -126,6 +126,8 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
                tags:         copyTags(tags),
                providerType: it.ProviderType,
                initCommand:  cmd,
+               running:      map[string]int64{},
+               killing:      map[string]bool{},
        }
        svm.SSHService = SSHService{
                HostKey:        sis.driver.HostKey,
@@ -177,12 +179,13 @@ func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
 // running (and might change IP addresses, shut down, etc.)  without
 // updating any stubInstances that have been returned to callers.
 type StubVM struct {
-       Boot                 time.Time
-       Broken               time.Time
-       CrunchRunMissing     bool
-       CrunchRunCrashRate   float64
-       CrunchRunDetachDelay time.Duration
-       ExecuteContainer     func(arvados.Container) int
+       Boot                  time.Time
+       Broken                time.Time
+       CrunchRunMissing      bool
+       CrunchRunCrashRate    float64
+       CrunchRunDetachDelay  time.Duration
+       ExecuteContainer      func(arvados.Container) int
+       CrashRunningContainer func(arvados.Container)
 
        sis          *StubInstanceSet
        id           cloud.InstanceID
@@ -190,7 +193,9 @@ type StubVM struct {
        initCommand  cloud.InitCommand
        providerType string
        SSHService   SSHService
-       running      map[string]bool
+       running      map[string]int64
+       killing      map[string]bool
+       lastPID      int64
        sync.Mutex
 }
 
@@ -244,16 +249,16 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                        }
                }
                svm.Lock()
-               if svm.running == nil {
-                       svm.running = map[string]bool{}
-               }
-               svm.running[uuid] = true
+               svm.lastPID++
+               pid := svm.lastPID
+               svm.running[uuid] = pid
                svm.Unlock()
                time.Sleep(svm.CrunchRunDetachDelay)
                fmt.Fprintf(stderr, "starting %s\n", uuid)
                logger := svm.sis.logger.WithFields(logrus.Fields{
                        "Instance":      svm.id,
                        "ContainerUUID": uuid,
+                       "PID":           pid,
                })
                logger.Printf("[test] starting crunch-run stub")
                go func() {
@@ -263,37 +268,43 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                                logger.Print("[test] container not in queue")
                                return
                        }
+
+                       defer func() {
+                               if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil {
+                                       svm.CrashRunningContainer(ctr)
+                               }
+                       }()
+
                        if crashluck > svm.CrunchRunCrashRate/2 {
                                time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
                                ctr.State = arvados.ContainerStateRunning
-                               queue.Notify(ctr)
+                               if !queue.Notify(ctr) {
+                                       ctr, _ = queue.Get(uuid)
+                                       logger.Print("[test] erroring out because state=Running update was rejected")
+                                       return
+                               }
                        }
 
                        time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+
                        svm.Lock()
-                       _, running := svm.running[uuid]
-                       svm.Unlock()
-                       if !running {
+                       defer svm.Unlock()
+                       if svm.running[uuid] != pid {
                                logger.Print("[test] container was killed")
                                return
                        }
-                       if svm.ExecuteContainer != nil {
-                               ctr.ExitCode = svm.ExecuteContainer(ctr)
-                       }
-                       // TODO: Check whether the stub instance has
-                       // been destroyed, and if so, don't call
-                       // queue.Notify. Then "container finished
-                       // twice" can be classified as a bug.
+                       delete(svm.running, uuid)
+
                        if crashluck < svm.CrunchRunCrashRate {
-                               logger.Print("[test] crashing crunch-run stub")
+                               logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
                        } else {
+                               if svm.ExecuteContainer != nil {
+                                       ctr.ExitCode = svm.ExecuteContainer(ctr)
+                               }
+                               logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub")
                                ctr.State = arvados.ContainerStateComplete
-                               queue.Notify(ctr)
+                               go queue.Notify(ctr)
                        }
-                       logger.Print("[test] exiting crunch-run stub")
-                       svm.Lock()
-                       defer svm.Unlock()
-                       delete(svm.running, uuid)
                }()
                return 0
        }
@@ -307,13 +318,34 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
        }
        if strings.HasPrefix(command, "crunch-run --kill ") {
                svm.Lock()
-               defer svm.Unlock()
-               if svm.running[uuid] {
-                       delete(svm.running, uuid)
+               pid, running := svm.running[uuid]
+               if running && !svm.killing[uuid] {
+                       svm.killing[uuid] = true
+                       go func() {
+                               time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond)
+                               svm.Lock()
+                               defer svm.Unlock()
+                               if svm.running[uuid] == pid {
+                                       // Kill only if the running entry
+                                       // hasn't since been killed and
+                                       // replaced with a different one.
+                                       delete(svm.running, uuid)
+                               }
+                               delete(svm.killing, uuid)
+                       }()
+                       svm.Unlock()
+                       time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
+                       svm.Lock()
+                       _, running = svm.running[uuid]
+               }
+               svm.Unlock()
+               if running {
+                       fmt.Fprintf(stderr, "%s: container is running\n", uuid)
+                       return 1
                } else {
                        fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+                       return 0
                }
-               return 0
        }
        if command == "true" {
                return 0
index e90935e2aa9e5747d08e136475cd186c0b4bc766..81a658535ea593a7a0a0c2d9231fd66f3055bdbd 100644 (file)
@@ -68,6 +68,8 @@ const (
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
        defaultTimeoutShutdown    = time.Second * 10
+       defaultTimeoutTERM        = time.Minute * 2
+       defaultTimeoutSignal      = time.Second * 5
 
        // Time after a quota error to try again anyway, even if no
        // instances have been shutdown.
@@ -105,6 +107,8 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
                timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               timeoutTERM:        duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
+               timeoutSignal:      duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
                installPublicKey:   installPublicKey,
                stop:               make(chan bool),
        }
@@ -136,6 +140,8 @@ type Pool struct {
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
+       timeoutTERM        time.Duration
+       timeoutSignal      time.Duration
        installPublicKey   ssh.PublicKey
 
        // private state
@@ -319,9 +325,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
        if !ok {
                return errors.New("requested instance does not exist")
        }
-       wkr.idleBehavior = idleBehavior
-       wkr.saveTags()
-       wkr.shutdownIfIdle()
+       wkr.setIdleBehavior(idleBehavior)
        return nil
 }
 
@@ -383,19 +387,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor
                probed:       now,
                busy:         now,
                updated:      now,
-               running:      make(map[string]struct{}),
-               starting:     make(map[string]struct{}),
+               running:      make(map[string]*remoteRunner),
+               starting:     make(map[string]*remoteRunner),
                probing:      make(chan struct{}, 1),
        }
        wp.workers[id] = wkr
        return wkr, true
 }
 
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
-       wp.exited[uuid] = t
-}
-
 // Shutdown shuts down a worker with the given type, or returns false
 // if all workers with the given type are busy.
 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
@@ -486,53 +485,29 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Reason":        reason,
+       })
        if _, ok := wp.exited[uuid]; ok {
-               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               logger.Debug("clearing placeholder for exited crunch-run process")
                delete(wp.exited, uuid)
                return
        }
        for _, wkr := range wp.workers {
-               if _, ok := wkr.running[uuid]; ok {
-                       go wp.kill(wkr, uuid)
-                       return
+               rr := wkr.running[uuid]
+               if rr == nil {
+                       rr = wkr.starting[uuid]
                }
-       }
-       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "ContainerUUID": uuid,
-               "Instance":      wkr.instance.ID(),
-       })
-       logger.Debug("killing process")
-       cmd := "crunch-run --kill 15 " + uuid
-       if u := wkr.instance.RemoteUser(); u != "root" {
-               cmd = "sudo " + cmd
-       }
-       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
-       if err != nil {
-               logger.WithFields(logrus.Fields{
-                       "stderr": string(stderr),
-                       "stdout": string(stdout),
-                       "error":  err,
-               }).Warn("kill failed")
-               return
-       }
-       logger.Debug("killing process succeeded")
-       wp.mtx.Lock()
-       defer wp.mtx.Unlock()
-       if _, ok := wkr.running[uuid]; ok {
-               delete(wkr.running, uuid)
-               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
-                       wkr.state = StateIdle
+               if rr != nil {
+                       rr.Kill(reason)
+                       return
                }
-               wkr.updated = time.Now()
-               go wp.notify()
        }
+       logger.Debug("cannot kill: already disappeared")
 }
 
 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
@@ -785,7 +760,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                })
                logger.Info("instance disappeared in cloud")
                delete(wp.workers, id)
-               go wkr.executor.Close()
+               go wkr.Close()
                notify = true
        }
 
diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go
new file mode 100644 (file)
index 0000000..c30ff9f
--- /dev/null
@@ -0,0 +1,154 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "syscall"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
+)
+
+// remoteRunner handles the starting and stopping of a crunch-run
+// process on a remote machine.
+type remoteRunner struct {
+       uuid          string
+       executor      Executor
+       arvClient     *arvados.Client
+       remoteUser    string
+       timeoutTERM   time.Duration
+       timeoutSignal time.Duration
+       onUnkillable  func(uuid string) // callback invoked when giving up on SIGTERM
+       onKilled      func(uuid string) // callback invoked when process exits after SIGTERM
+       logger        logrus.FieldLogger
+
+       stopping bool          // true if Stop() has been called
+       givenup  bool          // true if timeoutTERM has been reached
+       closed   chan struct{} // channel is closed if Close() has been called
+}
+
+// newRemoteRunner returns a new remoteRunner. Caller should ensure
+// Close() is called to release resources.
+func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+       rr := &remoteRunner{
+               uuid:          uuid,
+               executor:      wkr.executor,
+               arvClient:     wkr.wp.arvClient,
+               remoteUser:    wkr.instance.RemoteUser(),
+               timeoutTERM:   wkr.wp.timeoutTERM,
+               timeoutSignal: wkr.wp.timeoutSignal,
+               onUnkillable:  wkr.onUnkillable,
+               onKilled:      wkr.onKilled,
+               logger:        wkr.logger.WithField("ContainerUUID", uuid),
+               closed:        make(chan struct{}),
+       }
+       return rr
+}
+
+// Start a crunch-run process on the remote host.
+//
+// Start does not return any error encountered. The caller should
+// assume the remote process _might_ have started, at least until it
+// probes the worker and finds otherwise.
+func (rr *remoteRunner) Start() {
+       env := map[string]string{
+               "ARVADOS_API_HOST":  rr.arvClient.APIHost,
+               "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
+       }
+       if rr.arvClient.Insecure {
+               env["ARVADOS_API_HOST_INSECURE"] = "1"
+       }
+       envJSON, err := json.Marshal(env)
+       if err != nil {
+               panic(err)
+       }
+       stdin := bytes.NewBuffer(envJSON)
+       cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+       if rr.remoteUser != "root" {
+               cmd = "sudo " + cmd
+       }
+       stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
+       if err != nil {
+               rr.logger.WithField("stdout", string(stdout)).
+                       WithField("stderr", string(stderr)).
+                       WithError(err).
+                       Error("error starting crunch-run process")
+               return
+       }
+       rr.logger.Info("crunch-run process started")
+}
+
+// Close abandons the remote process (if any) and releases
+// resources. Close must not be called more than once.
+func (rr *remoteRunner) Close() {
+       close(rr.closed)
+}
+
+// Kill starts a background task to kill the remote process, first
+// trying SIGTERM until reaching timeoutTERM, then calling
+// onUnkillable().
+//
+// SIGKILL is not used. It would merely kill the crunch-run supervisor
+// and thereby make the docker container, arv-mount, etc. invisible to
+// us without actually stopping them.
+//
+// Once Kill has been called, calling it again has no effect.
+func (rr *remoteRunner) Kill(reason string) {
+       if rr.stopping {
+               return
+       }
+       rr.stopping = true
+       rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
+       go func() {
+               termDeadline := time.Now().Add(rr.timeoutTERM)
+               t := time.NewTicker(rr.timeoutSignal)
+               defer t.Stop()
+               for range t.C {
+                       switch {
+                       case rr.isClosed():
+                               return
+                       case time.Now().After(termDeadline):
+                               rr.logger.Debug("giving up")
+                               rr.givenup = true
+                               rr.onUnkillable(rr.uuid)
+                               return
+                       default:
+                               rr.kill(syscall.SIGTERM)
+                       }
+               }
+       }()
+}
+
+func (rr *remoteRunner) kill(sig syscall.Signal) {
+       logger := rr.logger.WithField("Signal", int(sig))
+       logger.Info("sending signal")
+       cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+       if rr.remoteUser != "root" {
+               cmd = "sudo " + cmd
+       }
+       stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
+       if err != nil {
+               logger.WithFields(logrus.Fields{
+                       "stderr": string(stderr),
+                       "stdout": string(stdout),
+                       "error":  err,
+               }).Info("kill attempt unsuccessful")
+               return
+       }
+       rr.onKilled(rr.uuid)
+}
+
+func (rr *remoteRunner) isClosed() bool {
+       select {
+       case <-rr.closed:
+               return true
+       default:
+               return false
+       }
+}
index 64e1f7797af8634be63502faea5faaaa8b30a5f9..41117c1d4eafb5aa2a92c163d3f79d72ace443d3 100644 (file)
@@ -6,7 +6,6 @@ package worker
 
 import (
        "bytes"
-       "encoding/json"
        "fmt"
        "strings"
        "sync"
@@ -87,62 +86,60 @@ type worker struct {
        busy         time.Time
        destroyed    time.Time
        lastUUID     string
-       running      map[string]struct{} // remember to update state idle<->running when this changes
-       starting     map[string]struct{} // remember to update state idle<->running when this changes
+       running      map[string]*remoteRunner // remember to update state idle<->running when this changes
+       starting     map[string]*remoteRunner // remember to update state idle<->running when this changes
        probing      chan struct{}
 }
 
+func (wkr *worker) onUnkillable(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       logger := wkr.logger.WithField("ContainerUUID", uuid)
+       if wkr.idleBehavior == IdleBehaviorHold {
+               logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+               return
+       }
+       logger.Warn("unkillable container, draining worker")
+       wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       wkr.closeRunner(uuid)
+       go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+       wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+       wkr.idleBehavior = idleBehavior
+       wkr.saveTags()
+       wkr.shutdownIfIdle()
+}
+
 // caller must have lock.
 func (wkr *worker) startContainer(ctr arvados.Container) {
        logger := wkr.logger.WithFields(logrus.Fields{
                "ContainerUUID": ctr.UUID,
                "Priority":      ctr.Priority,
        })
-       logger = logger.WithField("Instance", wkr.instance.ID())
        logger.Debug("starting container")
-       wkr.starting[ctr.UUID] = struct{}{}
+       rr := newRemoteRunner(ctr.UUID, wkr)
+       wkr.starting[ctr.UUID] = rr
        if wkr.state != StateRunning {
                wkr.state = StateRunning
                go wkr.wp.notify()
        }
        go func() {
-               env := map[string]string{
-                       "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
-                       "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
-               }
-               if wkr.wp.arvClient.Insecure {
-                       env["ARVADOS_API_HOST_INSECURE"] = "1"
-               }
-               envJSON, err := json.Marshal(env)
-               if err != nil {
-                       panic(err)
-               }
-               stdin := bytes.NewBuffer(envJSON)
-               cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
-               if u := wkr.instance.RemoteUser(); u != "root" {
-                       cmd = "sudo " + cmd
-               }
-               stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
+               rr.Start()
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
                wkr.updated = now
                wkr.busy = now
                delete(wkr.starting, ctr.UUID)
-               wkr.running[ctr.UUID] = struct{}{}
-               wkr.lastUUID = ctr.UUID
-               if err != nil {
-                       logger.WithField("stdout", string(stdout)).
-                               WithField("stderr", string(stderr)).
-                               WithError(err).
-                               Error("error starting crunch-run process")
-                       // Leave uuid in wkr.running, though: it's
-                       // possible the error was just a communication
-                       // failure and the process was in fact
-                       // started.  Wait for next probe to find out.
-                       return
-               }
-               logger.Info("crunch-run process started")
+               wkr.running[ctr.UUID] = rr
                wkr.lastUUID = ctr.UUID
        }()
 }
@@ -274,31 +271,8 @@ func (wkr *worker) probeAndUpdate() {
                // advantage of the non-busy state, though.
                wkr.busy = updateTime
        }
-       changed := false
 
-       // Build a new "running" map. Set changed=true if it differs
-       // from the existing map (wkr.running) to ensure the scheduler
-       // gets notified below.
-       running := map[string]struct{}{}
-       for _, uuid := range ctrUUIDs {
-               running[uuid] = struct{}{}
-               if _, ok := wkr.running[uuid]; !ok {
-                       if _, ok := wkr.starting[uuid]; !ok {
-                               // We didn't start it -- it must have
-                               // been started by a previous
-                               // dispatcher process.
-                               logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
-                       }
-                       changed = true
-               }
-       }
-       for uuid := range wkr.running {
-               if _, ok := running[uuid]; !ok {
-                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
-                       wkr.wp.notifyExited(uuid, updateTime)
-                       changed = true
-               }
-       }
+       changed := wkr.updateRunning(ctrUUIDs)
 
        // Update state if this was the first successful boot-probe.
        if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
@@ -317,14 +291,13 @@ func (wkr *worker) probeAndUpdate() {
 
        // Log whenever a run-probe reveals crunch-run processes
        // appearing/disappearing before boot-probe succeeds.
-       if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+       if wkr.state == StateUnknown && changed {
                logger.WithFields(logrus.Fields{
-                       "RunningContainers": len(running),
+                       "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
                }).Info("crunch-run probe succeeded, but boot probe is still failing")
        }
 
-       wkr.running = running
        if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
                wkr.state = StateRunning
        } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
@@ -333,7 +306,7 @@ func (wkr *worker) probeAndUpdate() {
        wkr.updated = updateTime
        if booted && (initialState == StateUnknown || initialState == StateBooting) {
                logger.WithFields(logrus.Fields{
-                       "RunningContainers": len(running),
+                       "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
                }).Info("probes succeeded, instance is in service")
        }
@@ -402,27 +375,52 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
        return true
 }
 
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
 // caller must have lock.
-func (wkr *worker) shutdownIfIdle() bool {
+func (wkr *worker) eligibleForShutdown() bool {
        if wkr.idleBehavior == IdleBehaviorHold {
-               // Never shut down.
                return false
        }
-       age := time.Since(wkr.busy)
-
-       old := age >= wkr.wp.timeoutIdle
        draining := wkr.idleBehavior == IdleBehaviorDrain
-       shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
-               (draining && wkr.state == StateBooting)
-       if !shouldShutdown {
+       switch wkr.state {
+       case StateBooting:
+               return draining
+       case StateIdle:
+               return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+       case StateRunning:
+               if !draining {
+                       return false
+               }
+               for _, rr := range wkr.running {
+                       if !rr.givenup {
+                               return false
+                       }
+               }
+               for _, rr := range wkr.starting {
+                       if !rr.givenup {
+                               return false
+                       }
+               }
+               // draining, and all remaining runners are just trying
+               // to force-kill their crunch-run procs
+               return true
+       default:
                return false
        }
+}
 
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+       if !wkr.eligibleForShutdown() {
+               return false
+       }
        wkr.logger.WithFields(logrus.Fields{
                "State":        wkr.state,
-               "IdleDuration": stats.Duration(age),
+               "IdleDuration": stats.Duration(time.Since(wkr.busy)),
                "IdleBehavior": wkr.idleBehavior,
-       }).Info("shutdown idle worker")
+       }).Info("shutdown worker")
        wkr.shutdown()
        return true
 }
@@ -468,3 +466,68 @@ func (wkr *worker) saveTags() {
                }()
        }
 }
+
+func (wkr *worker) Close() {
+       // This might take time, so do it after unlocking mtx.
+       defer wkr.executor.Close()
+
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       for uuid, rr := range wkr.running {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+       for uuid, rr := range wkr.starting {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+       alive := map[string]bool{}
+       for _, uuid := range ctrUUIDs {
+               alive[uuid] = true
+               if _, ok := wkr.running[uuid]; ok {
+                       // unchanged
+               } else if rr, ok := wkr.starting[uuid]; ok {
+                       wkr.running[uuid] = rr
+                       delete(wkr.starting, uuid)
+                       changed = true
+               } else {
+                       // We didn't start it -- it must have been
+                       // started by a previous dispatcher process.
+                       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+                       wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+                       changed = true
+               }
+       }
+       for uuid := range wkr.running {
+               if !alive[uuid] {
+                       wkr.closeRunner(uuid)
+                       changed = true
+               }
+       }
+       return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+       rr := wkr.running[uuid]
+       if rr == nil {
+               return
+       }
+       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+       delete(wkr.running, uuid)
+       rr.Close()
+
+       now := time.Now()
+       wkr.updated = now
+       wkr.wp.exited[uuid] = now
+       if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+               wkr.state = StateIdle
+       }
+}
index 3bc33b62c9fee896f107278a564b859d1448366e..15a2a894c5bceb89bdae4f6c5e4146d317dca083 100644 (file)
@@ -209,12 +209,17 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        busy:     ctime,
                        probed:   ctime,
                        updated:  ctime,
+                       running:  map[string]*remoteRunner{},
+                       starting: map[string]*remoteRunner{},
+                       probing:  make(chan struct{}, 1),
                }
                if trial.running > 0 {
-                       wkr.running = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+                       uuid := "zzzzz-dz642-abcdefghijklmno"
+                       wkr.running = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
                }
                if trial.starting > 0 {
-                       wkr.starting = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+                       uuid := "zzzzz-dz642-bcdefghijklmnop"
+                       wkr.starting = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
                }
                wkr.probeAndUpdate()
                c.Check(wkr.state, check.Equals, trial.expectState)
index f16f98a943cdbe2f0501a35d95cb3e45e9c9d5a9..73addb739cd0e25b47212300d420fa4f2b8c8e7d 100644 (file)
@@ -122,6 +122,12 @@ type Dispatch struct {
 
        // Maximum total worker probes per second
        MaxProbesPerSecond int
+
+       // Time before repeating SIGTERM when killing a container
+       TimeoutSignal Duration
+
+       // Time to give up on SIGTERM and write off the worker
+       TimeoutTERM Duration
 }
 
 type CloudVMs struct {
index b3c530e69013e91dfb7599c677347ecef3856d78..933692bdc55b3bbf9c63e78c29c5615418a33d05 100644 (file)
@@ -116,14 +116,21 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
 
        proc, err := os.FindProcess(pi.PID)
        if err != nil {
+               // FindProcess should have succeeded, even if the
+               // process does not exist.
                return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
        }
 
+       // Send the requested signal once, then send signal 0 a few
+       // times.  When proc.Signal() returns an error (process no
+       // longer exists), return success.  If that doesn't happen
+       // within 1 second, return an error.
        err = proc.Signal(signal)
        for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
                err = proc.Signal(syscall.Signal(0))
        }
        if err == nil {
+               // Reached deadline without a proc.Signal() error.
                return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
        }
        fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)