Merge remote-tracking branch 'origin/master' into 14645-fuse-operations-reporting
authorEric Biagiotti <ebiagiotti@veritasgenetics.com>
Thu, 21 Mar 2019 18:03:16 +0000 (14:03 -0400)
committerEric Biagiotti <ebiagiotti@veritasgenetics.com>
Thu, 21 Mar 2019 18:03:16 +0000 (14:03 -0400)
refs #14645

Arvados-DCO-1.1-Signed-off-by: Eric Biagiotti <ebiagiotti@veritasgenetics.com>

42 files changed:
apps/workbench/app/controllers/container_requests_controller.rb
apps/workbench/app/helpers/application_helper.rb
build/run-library.sh
build/run-tests.sh
doc/user/cwl/bwa-mem/bwa-mem-input-mixed.yml [new file with mode: 0755]
doc/user/cwl/bwa-mem/bwa-mem-input-uuids.yml [new file with mode: 0755]
doc/user/cwl/cwl-runner.html.textile.liquid
lib/dispatchcloud/container/queue.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/scheduler/fix_stale_locks.go
lib/dispatchcloud/scheduler/interfaces.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/sync.go
lib/dispatchcloud/test/queue.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/runner.go [new file with mode: 0644]
lib/dispatchcloud/worker/worker.go
lib/dispatchcloud/worker/worker_test.go
sdk/cli/arvados-cli.gemspec
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/arvados_cwl/util.py
sdk/cwl/tests/submit_test_job_with_inconsistent_uuids.json [new file with mode: 0644]
sdk/cwl/tests/submit_test_job_with_mismatched_uuids.json [new file with mode: 0644]
sdk/cwl/tests/submit_test_job_with_uuids.json [new file with mode: 0644]
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_submit.py
sdk/go/arvados/config.go
sdk/ruby/arvados.gemspec
services/api/test/integration/groups_test.rb
services/crunch-run/background.go
tools/arvbox/bin/arvbox
tools/arvbox/lib/arvbox/docker/Dockerfile.base
tools/arvbox/lib/arvbox/docker/Dockerfile.dev
tools/arvbox/lib/arvbox/docker/common.sh
tools/arvbox/lib/arvbox/docker/createusers.sh
tools/arvbox/lib/arvbox/docker/devenv.sh [new file with mode: 0755]

index 783cafa117d9c23b3641f5f9883b90ea066be384..454be448d9d1e7afad061ac983cd38780abd1365 100644 (file)
@@ -120,7 +120,7 @@ class ContainerRequestsController < ApplicationController
             c = Collection.find(re[1])
             input_obj[param_id] = {"class" => primary_type,
                                    "location" => "keep:#{c.portable_data_hash}#{re[4]}",
-                                   "arv:collection" => input_obj[param_id]}
+                                   "http://arvados.org/cwl#collectionUUID" => re[1]}
           end
         end
       end
index 15bf77fa094f188e5b3f8be980c5c57e3d73bcfe..4c4b5ff34df52c471fa2ceaf566e8f9a5b606d02 100644 (file)
@@ -495,11 +495,12 @@ module ApplicationHelper
       chooser_title = "Choose a #{primary_type == 'Directory' ? 'dataset' : 'file'}:"
       selection_param = object.class.to_s.underscore + dn
       if attrvalue.is_a? Hash
-        display_value = attrvalue[:"arv:collection"] || attrvalue[:location]
+        display_value = attrvalue[:"http://arvados.org/cwl#collectionUUID"] || attrvalue[:"arv:collection"] || attrvalue[:location]
         re = CollectionsHelper.match_uuid_with_optional_filepath(display_value)
+        locationre = CollectionsHelper.match(attrvalue[:location][5..-1])
         if re
-          if re[4]
-            display_value = "#{Collection.find(re[1]).name} / #{re[4][1..-1]}"
+          if locationre and locationre[4]
+            display_value = "#{Collection.find(re[1]).name} / #{locationre[4][1..-1]}"
           else
             display_value = Collection.find(re[1]).name
           end
@@ -677,8 +678,8 @@ module ApplicationHelper
     render_runtime duration, use_words, round_to_min
   end
 
-  # Keep locators are expected to be of the form \"...<pdh/file_path>\"
-  JSON_KEEP_LOCATOR_REGEXP = /([0-9a-f]{32}\+\d+[^'"]*?)(?=['"]|\z|$)/
+  # Keep locators are expected to be of the form \"...<pdh/file_path>\" or \"...<uuid/file_path>\"
+  JSON_KEEP_LOCATOR_REGEXP = /([0-9a-f]{32}\+\d+[^'"]*|[a-z0-9]{5}-4zz18-[a-z0-9]{15}[^'"]*)(?=['"]|\z|$)/
   def keep_locator_in_json str
     # Return a list of all matches
     str.scan(JSON_KEEP_LOCATOR_REGEXP).flatten
index de9d67d41cbe3f94a6bb9ca1c9ac1a819051dc35..1daceff2393537485d0dd51f381648076f43d512 100755 (executable)
@@ -826,14 +826,13 @@ install_package() {
   fi
 }
 
-title () {
-    txt="********** $1 **********"
-    printf "\n%*s%s\n\n" $((($COLUMNS-${#txt})/2)) "" "$txt"
+title() {
+    printf '%s %s\n' "=======" "$1"
 }
 
 checkexit() {
     if [[ "$1" != "0" ]]; then
-        title "!!!!!! $2 FAILED !!!!!!"
+        title "$2 -- FAILED"
         failures+=("$2 (`timer`)")
     else
         successes+=("$2 (`timer`)")
@@ -856,7 +855,9 @@ report_outcomes() {
 
     if [[ ${#failures[@]} == 0 ]]
     then
-        echo "All test suites passed."
+        if [[ ${#successes[@]} != 0 ]]; then
+           echo "All test suites passed."
+        fi
     else
         echo "Failures (${#failures[@]}):"
         for x in "${failures[@]}"
index 095d32eaae39de11bc031619d121bbe16562c2ed..d4afb52fa34382a33bca323ebbdf26b56e89efd4 100755 (executable)
@@ -19,6 +19,10 @@ Syntax:
 Options:
 
 --skip FOO     Do not test the FOO component.
+--skip sanity  Skip initial dev environment sanity checks.
+--skip install Do not run any install steps. Just run tests.
+               You should provide GOPATH, GEMHOME, and VENVDIR options
+               from a previous invocation if you use this option.
 --only FOO     Do not test anything except the FOO component.
 --temp DIR     Install components and dependencies under DIR instead of
                making a new temporary directory. Implies --leave-temp.
@@ -27,11 +31,9 @@ Options:
                subsequent invocations.
 --repeat N     Repeat each install/test step until it succeeds N times.
 --retry        Prompt to retry if an install or test suite fails.
---skip-install Do not run any install steps. Just run tests.
-               You should provide GOPATH, GEMHOME, and VENVDIR options
-               from a previous invocation if you use this option.
 --only-install Run specific install step
 --short        Skip (or scale down) some slow tests.
+--interactive  Set up, then prompt for test/install steps to perform.
 WORKSPACE=path Arvados source tree to test.
 CONFIGSRC=path Dir with api server config files to copy into source tree.
                (If none given, leave config files alone in source tree.)
@@ -49,7 +51,7 @@ ARVADOS_DEBUG=1
 envvar=value   Set \$envvar to value. Primarily useful for WORKSPACE,
                *_test, and other examples shown above.
 
-Assuming --skip-install is not given, all components are installed
+Assuming "--skip install" is not given, all components are installed
 into \$GOPATH, \$VENDIR, and \$GEMHOME before running any tests. Many
 test suites depend on other components being installed, and installing
 everything tends to be quicker than debugging dependencies.
@@ -169,7 +171,9 @@ fatal() {
 
 exit_cleanly() {
     trap - INT
-    create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+    if which create-plot-data-from-log.sh >/dev/null; then
+        create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+    fi
     rotate_logfile "$WORKSPACE/apps/workbench/log/" "test.log"
     stop_services
     rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
@@ -179,6 +183,7 @@ exit_cleanly() {
 }
 
 sanity_checks() {
+    [[ -n "${skip[sanity]}" ]] && return 0
     ( [[ -n "$WORKSPACE" ]] && [[ -d "$WORKSPACE/services" ]] ) \
         || fatal "WORKSPACE environment variable not set to a source directory (see: $0 --help)"
     echo Checking dependencies:
@@ -303,8 +308,11 @@ do
         --short)
             short=1
             ;;
+        --interactive)
+            interactive=1
+            ;;
         --skip-install)
-            only_install=nothing
+            skip[install]=1
             ;;
         --only-install)
             only_install="$1"; shift
@@ -355,6 +363,10 @@ if [[ $NEED_SDK_R == false ]]; then
 fi
 
 start_services() {
+    if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
+        return 0
+    fi
+    . "$VENVDIR/bin/activate"
     echo 'Starting API, keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
     if [[ ! -d "$WORKSPACE/services/api/log" ]]; then
        mkdir -p "$WORKSPACE/services/api/log"
@@ -363,8 +375,10 @@ start_services() {
     if [[ -f "$WORKSPACE/tmp/api.pid" && ! -s "$WORKSPACE/tmp/api.pid" ]]; then
        rm -f "$WORKSPACE/tmp/api.pid"
     fi
+    all_services_stopped=
+    fail=0
     cd "$WORKSPACE" \
-        && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo fail=1) \
+        && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo "fail=1; false") \
         && export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
         && export ARVADOS_TEST_API_INSTALLED="$$" \
         && python sdk/python/tests/run_test_server.py start_controller \
@@ -372,18 +386,22 @@ start_services() {
         && python sdk/python/tests/run_test_server.py start_keep-web \
         && python sdk/python/tests/run_test_server.py start_arv-git-httpd \
         && python sdk/python/tests/run_test_server.py start_ws \
-        && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo fail=1) \
-        && (env | egrep ^ARVADOS)
-    if [[ -n "$fail" ]]; then
-       return 1
+        && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo "fail=1; false") \
+        && (env | egrep ^ARVADOS) \
+        || fail=1
+    deactivate
+    if [[ $fail != 0 ]]; then
+        unset ARVADOS_TEST_API_HOST
     fi
+    return $fail
 }
 
 stop_services() {
-    if [[ -z "$ARVADOS_TEST_API_HOST" ]]; then
+    if [[ -n "$all_services_stopped" ]]; then
         return
     fi
     unset ARVADOS_TEST_API_HOST
+    . "$VENVDIR/bin/activate" || return
     cd "$WORKSPACE" \
         && python sdk/python/tests/run_test_server.py stop_nginx \
         && python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
@@ -391,7 +409,9 @@ stop_services() {
         && python sdk/python/tests/run_test_server.py stop_keep-web \
         && python sdk/python/tests/run_test_server.py stop_keep_proxy \
         && python sdk/python/tests/run_test_server.py stop_controller \
-        && python sdk/python/tests/run_test_server.py stop
+        && python sdk/python/tests/run_test_server.py stop \
+        && all_services_stopped=1
+    deactivate
 }
 
 interrupt() {
@@ -400,36 +420,6 @@ interrupt() {
 }
 trap interrupt INT
 
-sanity_checks
-
-echo "WORKSPACE=$WORKSPACE"
-
-if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
-    # Jenkins expects us to use this by default.
-    CONFIGSRC="$HOME/arvados-api-server"
-fi
-
-# Clean up .pyc files that may exist in the workspace
-cd "$WORKSPACE"
-find -name '*.pyc' -delete
-
-if [[ -z "$temp" ]]; then
-    temp="$(mktemp -d)"
-fi
-
-# Set up temporary install dirs (unless existing dirs were supplied)
-for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
-do
-    if [[ -z "${!tmpdir}" ]]; then
-        eval "$tmpdir"="$temp/$tmpdir"
-    fi
-    if ! [[ -d "${!tmpdir}" ]]; then
-        mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
-    fi
-done
-
-rm -vf "${WORKSPACE}/tmp/*.log"
-
 setup_ruby_environment() {
     if [[ -s "$HOME/.rvm/scripts/rvm" ]] ; then
         source "$HOME/.rvm/scripts/rvm"
@@ -528,114 +518,136 @@ setup_virtualenv() {
     "$venvdest/bin/pip" install --no-cache-dir 'mock>=1.0' 'pbr<1.7.0'
 }
 
-export PERLINSTALLBASE
-export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
+initialize() {
+    sanity_checks
 
-export R_LIBS
+    echo "WORKSPACE=$WORKSPACE"
 
-export GOPATH
-(
-    set -e
-    mkdir -p "$GOPATH/src/git.curoverse.com"
-    if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
-        for d in \
-            "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
-                "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
-                "$GOPATH/src/git.curoverse.com/arvados.git"; do
-            [[ -d "$d" ]] && rmdir "$d"
-        done
+    if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
+        # Jenkins expects us to use this by default.
+        CONFIGSRC="$HOME/arvados-api-server"
     fi
-    for d in \
-        "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
-        "$GOPATH/src/git.curoverse.com/arvados.git"; do
-        [[ -h "$d" ]] && rm "$d"
-    done
-    ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
-    go get -v github.com/kardianos/govendor
-    cd "$GOPATH/src/git.curoverse.com/arvados.git"
-    if [[ -n "$short" ]]; then
-        go get -v -d ...
-        "$GOPATH/bin/govendor" sync
-    else
-        # Remove cached source dirs in workdir. Otherwise, they will
-        # not qualify as +missing or +external below, and we won't be
-        # able to detect that they're missing from vendor/vendor.json.
-        rm -rf vendor/*/
-        go get -v -d ...
-        "$GOPATH/bin/govendor" sync
-        [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
-            || fatal "vendor/vendor.json has unused or missing dependencies -- try:
 
-(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
+    # Clean up .pyc files that may exist in the workspace
+    cd "$WORKSPACE"
+    find -name '*.pyc' -delete
 
-";
+    if [[ -z "$temp" ]]; then
+        temp="$(mktemp -d)"
     fi
-) || fatal "Go setup failed"
 
-setup_virtualenv "$VENVDIR" --python python2.7
-. "$VENVDIR/bin/activate"
+    # Set up temporary install dirs (unless existing dirs were supplied)
+    for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
+    do
+        if [[ -z "${!tmpdir}" ]]; then
+            eval "$tmpdir"="$temp/$tmpdir"
+        fi
+        if ! [[ -d "${!tmpdir}" ]]; then
+            mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
+        fi
+    done
 
-# Needed for run_test_server.py which is used by certain (non-Python) tests.
-pip install --no-cache-dir PyYAML \
-    || fatal "pip install PyYAML failed"
+    rm -vf "${WORKSPACE}/tmp/*.log"
 
-# Preinstall libcloud if using a fork; otherwise nodemanager "pip
-# install" won't pick it up by default.
-if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
-    pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
-        || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
-        || fatal "pip install apache-libcloud failed"
-fi
+    export PERLINSTALLBASE
+    export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
 
-# Deactivate Python 2 virtualenv
-deactivate
+    export R_LIBS
 
-declare -a pythonstuff
-pythonstuff=(
-    sdk/pam
-    sdk/python
-    sdk/python:py3
-    sdk/cwl
-    sdk/cwl:py3
-    services/dockercleaner:py3
-    services/fuse
-    services/nodemanager
-    tools/crunchstat-summary
-    )
+    export GOPATH
 
-# If Python 3 is available, set up its virtualenv in $VENV3DIR.
-# Otherwise, skip dependent tests.
-PYTHON3=$(which python3)
-if [[ ${?} = 0 ]]; then
-    setup_virtualenv "$VENV3DIR" --python python3
-else
-    PYTHON3=
-    cat >&2 <<EOF
+    # Jenkins config requires that glob tmp/*.log match something. Ensure
+    # that happens even if we don't end up running services that set up
+    # logging.
+    mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
+    touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
 
-Warning: python3 could not be found. Python 3 tests will be skipped.
+    unset http_proxy https_proxy no_proxy
 
-EOF
-fi
 
-# Reactivate Python 2 virtualenv
-. "$VENVDIR/bin/activate"
+    # Note: this must be the last time we change PATH, otherwise rvm will
+    # whine a lot.
+    setup_ruby_environment
+
+    echo "PATH is $PATH"
+}
+
+install_env() {
+    (
+        set -e
+        mkdir -p "$GOPATH/src/git.curoverse.com"
+        if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
+            for d in \
+                "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
+                    "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
+                    "$GOPATH/src/git.curoverse.com/arvados.git"; do
+                [[ -d "$d" ]] && rmdir "$d"
+            done
+        fi
+        for d in \
+            "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
+                "$GOPATH/src/git.curoverse.com/arvados.git"; do
+            [[ -h "$d" ]] && rm "$d"
+        done
+        ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+        go get -v github.com/kardianos/govendor
+        cd "$GOPATH/src/git.curoverse.com/arvados.git"
+        if [[ -n "$short" ]]; then
+            go get -v -d ...
+            "$GOPATH/bin/govendor" sync
+        else
+            # Remove cached source dirs in workdir. Otherwise, they will
+            # not qualify as +missing or +external below, and we won't be
+            # able to detect that they're missing from vendor/vendor.json.
+            rm -rf vendor/*/
+            go get -v -d ...
+            "$GOPATH/bin/govendor" sync
+            [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
+                || fatal "vendor/vendor.json has unused or missing dependencies -- try:
+
+(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
 
-# Note: this must be the last time we change PATH, otherwise rvm will
-# whine a lot.
-setup_ruby_environment
+";
+        fi
+    ) || fatal "Go setup failed"
 
-echo "PATH is $PATH"
+    setup_virtualenv "$VENVDIR" --python python2.7
+    . "$VENVDIR/bin/activate"
 
-if ! which bundler >/dev/null
-then
-    gem install --user-install bundler || fatal 'Could not install bundler'
-fi
+    # Needed for run_test_server.py which is used by certain (non-Python) tests.
+    pip install --no-cache-dir PyYAML \
+        || fatal "pip install PyYAML failed"
+
+    # Preinstall libcloud if using a fork; otherwise nodemanager "pip
+    # install" won't pick it up by default.
+    if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
+        pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
+            || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
+            || fatal "pip install apache-libcloud failed"
+    fi
 
-# Jenkins config requires that glob tmp/*.log match something. Ensure
-# that happens even if we don't end up running services that set up
-# logging.
-mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
-touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
+    # Deactivate Python 2 virtualenv
+    deactivate
+
+    # If Python 3 is available, set up its virtualenv in $VENV3DIR.
+    # Otherwise, skip dependent tests.
+    PYTHON3=$(which python3)
+    if [[ ${?} = 0 ]]; then
+        setup_virtualenv "$VENV3DIR" --python python3
+    else
+        PYTHON3=
+        cat >&2 <<EOF
+
+Warning: python3 could not be found. Python 3 tests will be skipped.
+
+EOF
+    fi
+
+    if ! which bundler >/dev/null
+    then
+        gem install --user-install bundler || fatal 'Could not install bundler'
+    fi
+}
 
 retry() {
     remain="${repeat}"
@@ -644,7 +656,7 @@ retry() {
         if ${@}; then
             if [[ "$remain" -gt 1 ]]; then
                 remain=$((${remain}-1))
-                title "Repeating ${remain} more times"
+                title "(repeating ${remain} more times)"
             else
                 break
             fi
@@ -672,22 +684,39 @@ do_test() {
             suite="${1}"
             ;;
     esac
-    if [[ -z "${skip[$suite]}" && -z "${skip[$1]}" && \
-              (${#only[@]} -eq 0 || ${only[$suite]} -eq 1 || \
-                   ${only[$1]} -eq 1) ||
-                  ${only[$2]} -eq 1 ]]; then
-        retry do_test_once ${@}
-    else
-        title "Skipping ${1} tests"
+    if [[ -n "${skip[$suite]}" || \
+              -n "${skip[$1]}" || \
+              (${#only[@]} -ne 0 && ${only[$suite]} -eq 0 && ${only[$1]} -eq 0) ]]; then
+        return 0
     fi
+    case "${1}" in
+        services/api)
+            stop_services
+            ;;
+        doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
+            # don't care whether services are running
+            ;;
+        *)
+            if ! start_services; then
+                title "test $1 -- failed to start services"
+                return 1
+            fi
+            ;;
+    esac
+    retry do_test_once ${@}
 }
 
 do_test_once() {
     unset result
 
-    title "Running $1 tests"
+    title "test $1"
     timer_reset
-    if [[ "$2" == "go" ]]
+
+    if which deactivate >/dev/null; then deactivate; fi
+    if ! . "$VENVDIR/bin/activate"
+    then
+        result=1
+    elif [[ "$2" == "go" ]]
     then
         covername="coverage-$(echo "$1" | sed -e 's/\//_/g')"
         coverflags=("-covermode=count" "-coverprofile=$WORKSPACE/tmp/.$covername.tmp")
@@ -740,28 +769,25 @@ do_test_once() {
     fi
     result=${result:-$?}
     checkexit $result "$1 tests"
-    title "End of $1 tests (`timer`)"
+    title "test $1 -- `timer`"
     return $result
 }
 
 do_install() {
-  skipit=false
-
-  if [[ -z "${only_install}" || "${only_install}" == "${1}" || "${only_install}" == "${2}" ]]; then
-      retry do_install_once ${@}
-  else
-      skipit=true
-  fi
-
-  if [[ "$skipit" = true ]]; then
-    title "Skipping $1 install"
-  fi
+    if [[ -n "${skip[install]}" || ( -n "${only_install}" && "${only_install}" != "${1}" && "${only_install}" != "${2}" ) ]]; then
+        return 0
+    fi
+    retry do_install_once ${@}
 }
 
 do_install_once() {
-    title "Running $1 install"
+    title "install $1"
     timer_reset
-    if [[ "$2" == "go" ]]
+
+    if which deactivate >/dev/null; then deactivate; fi
+    if [[ "$1" != "env" ]] && ! . "$VENVDIR/bin/activate"; then
+        result=1
+    elif [[ "$2" == "go" ]]
     then
         go get -ldflags "-X main.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1"
     elif [[ "$2" == "pip" ]]
@@ -789,9 +815,9 @@ do_install_once() {
     else
         "install_$1"
     fi
-    result=$?
+    result=${result:-$?}
     checkexit $result "$1 install"
-    title "End of $1 install (`timer`)"
+    title "install $1 -- `timer`"
     return $result
 }
 
@@ -812,7 +838,6 @@ install_doc() {
         && bundle_install_trylocal \
         && rm -rf .site
 }
-do_install doc
 
 install_gem() {
     gemname=$1
@@ -824,56 +849,32 @@ install_gem() {
         && with_test_gemset gem install --no-ri --no-rdoc $(ls -t "$gemname"-*.gem|head -n1)
 }
 
-install_ruby_sdk() {
+install_sdk/ruby() {
     install_gem arvados sdk/ruby
 }
-do_install sdk/ruby ruby_sdk
 
-install_R_sdk() {
+install_sdk/R() {
   if [[ "$NEED_SDK_R" = true ]]; then
     cd "$WORKSPACE/sdk/R" \
        && Rscript --vanilla install_deps.R
   fi
 }
-do_install sdk/R R_sdk
 
-install_perl_sdk() {
+install_sdk/perl() {
     cd "$WORKSPACE/sdk/perl" \
         && perl Makefile.PL INSTALL_BASE="$PERLINSTALLBASE" \
         && make install INSTALLDIRS=perl
 }
-do_install sdk/perl perl_sdk
 
-install_cli() {
+install_sdk/cli() {
     install_gem arvados-cli sdk/cli
 }
-do_install sdk/cli cli
 
-install_login-sync() {
+install_services/login-sync() {
     install_gem arvados-login-sync services/login-sync
 }
-do_install services/login-sync login-sync
-
-# Install the Python SDK early. Various other test suites (like
-# keepproxy) bring up run_test_server.py, which imports the arvados
-# module. We can't actually *test* the Python SDK yet though, because
-# its own test suite brings up some of those other programs (like
-# keepproxy).
-for p in "${pythonstuff[@]}"
-do
-    dir=${p%:py3}
-    if [[ ${dir} = ${p} ]]; then
-        if [[ -z ${skip[python2]} ]]; then
-            do_install ${dir} pip
-        fi
-    elif [[ -n ${PYTHON3} ]]; then
-        if [[ -z ${skip[python3]} ]]; then
-            do_install ${dir} pip "$VENV3DIR/bin/"
-        fi
-    fi
-done
 
-install_apiserver() {
+install_services/api() {
     cd "$WORKSPACE/services/api" \
         && RAILS_ENV=test bundle_install_trylocal
 
@@ -921,7 +922,19 @@ install_apiserver() {
         && RAILS_ENV=test bundle exec rake db:setup \
         && RAILS_ENV=test bundle exec rake db:fixtures:load
 }
-do_install services/api apiserver
+
+declare -a pythonstuff
+pythonstuff=(
+    sdk/pam
+    sdk/python
+    sdk/python:py3
+    sdk/cwl
+    sdk/cwl:py3
+    services/dockercleaner:py3
+    services/fuse
+    services/nodemanager
+    tools/crunchstat-summary
+)
 
 declare -a gostuff
 gostuff=(
@@ -967,22 +980,15 @@ gostuff=(
     tools/keep-rsync
     tools/sync-groups
 )
-for g in "${gostuff[@]}"
-do
-    do_install "$g" go
-done
 
-install_workbench() {
+install_apps/workbench() {
     cd "$WORKSPACE/apps/workbench" \
         && mkdir -p tmp/cache \
         && RAILS_ENV=test bundle_install_trylocal \
         && RAILS_ENV=test RAILS_GROUPS=assets bundle exec rake npm:install
 }
-do_install apps/workbench workbench
 
-unset http_proxy https_proxy no_proxy
-
-test_doclinkchecker() {
+test_doc() {
     (
         set -e
         cd "$WORKSPACE/doc"
@@ -992,108 +998,239 @@ test_doclinkchecker() {
         PYTHONPATH=$WORKSPACE/sdk/python/ bundle exec rake linkchecker baseurl=file://$WORKSPACE/doc/.site/ arvados_workbench_host=https://workbench.$ARVADOS_API_HOST arvados_api_host=$ARVADOS_API_HOST
     )
 }
-do_test doc doclinkchecker
-
-stop_services
 
-test_apiserver() {
+test_services/api() {
     rm -f "$WORKSPACE/services/api/git-commit.version"
     cd "$WORKSPACE/services/api" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[services/api]}
 }
-do_test services/api apiserver
-
-# Shortcut for when we're only running apiserver tests. This saves a bit of time,
-# because we don't need to start up the api server for subsequent tests.
-if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
-  rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
-  exit_cleanly
-fi
-
-start_services || { stop_services; fatal "start_services"; }
 
-test_ruby_sdk() {
+test_sdk/ruby() {
     cd "$WORKSPACE/sdk/ruby" \
         && bundle exec rake test TESTOPTS=-v ${testargs[sdk/ruby]}
 }
-do_test sdk/ruby ruby_sdk
 
-test_R_sdk() {
+test_sdk/R() {
   if [[ "$NEED_SDK_R" = true ]]; then
     cd "$WORKSPACE/sdk/R" \
         && Rscript --vanilla run_test.R
   fi
 }
 
-do_test sdk/R R_sdk
-
-test_cli() {
+test_sdk/cli() {
     cd "$WORKSPACE/sdk/cli" \
         && mkdir -p /tmp/keep \
         && KEEP_LOCAL_STORE=/tmp/keep bundle exec rake test TESTOPTS=-v ${testargs[sdk/cli]}
 }
-do_test sdk/cli cli
 
-test_login-sync() {
+test_services/login-sync() {
     cd "$WORKSPACE/services/login-sync" \
         && bundle exec rake test TESTOPTS=-v ${testargs[services/login-sync]}
 }
-do_test services/login-sync login-sync
 
-test_nodemanager_integration() {
+test_services/nodemanager_integration() {
     cd "$WORKSPACE/services/nodemanager" \
         && tests/integration_test.py ${testargs[services/nodemanager_integration]}
 }
-do_test services/nodemanager_integration nodemanager_integration
 
-for p in "${pythonstuff[@]}"
-do
-    dir=${p%:py3}
-    if [[ ${dir} = ${p} ]]; then
-        if [[ -z ${skip[python2]} ]]; then
-            do_test ${dir} pip
-        fi
-    elif [[ -n ${PYTHON3} ]]; then
-        if [[ -z ${skip[python3]} ]]; then
-            do_test ${dir} pip "$VENV3DIR/bin/"
-        fi
-    fi
-done
-
-for g in "${gostuff[@]}"
-do
-    do_test "$g" go
-done
-
-test_workbench_units() {
+test_apps/workbench_units() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:units TESTOPTS=-v ${testargs[apps/workbench]}
 }
-do_test apps/workbench_units workbench_units
 
-test_workbench_functionals() {
+test_apps/workbench_functionals() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:functionals TESTOPTS=-v ${testargs[apps/workbench]}
 }
-do_test apps/workbench_functionals workbench_functionals
 
-test_workbench_integration() {
+test_apps/workbench_integration() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:integration TESTOPTS=-v ${testargs[apps/workbench]}
 }
-do_test apps/workbench_integration workbench_integration
-
 
-test_workbench_benchmark() {
+test_apps/workbench_benchmark() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]}
 }
-do_test apps/workbench_benchmark workbench_benchmark
 
-test_workbench_profile() {
+test_apps/workbench_profile() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:profile ${testargs[apps/workbench_profile]}
 }
-do_test apps/workbench_profile workbench_profile
 
+install_deps() {
+    # Install parts needed by test suites
+    do_install env
+    do_install cmd/arvados-server go
+    do_install sdk/cli
+    do_install sdk/perl
+    do_install sdk/python pip
+    do_install sdk/ruby
+    do_install services/api
+    do_install services/arv-git-httpd go
+    do_install services/keepproxy go
+    do_install services/keepstore go
+    do_install services/keep-web go
+    do_install services/ws go
+}
+
+install_all() {
+    do_install env
+    do_install doc
+    do_install sdk/ruby
+    do_install sdk/R
+    do_install sdk/perl
+    do_install sdk/cli
+    do_install services/login-sync
+    for p in "${pythonstuff[@]}"
+    do
+        dir=${p%:py3}
+        if [[ ${dir} = ${p} ]]; then
+            if [[ -z ${skip[python2]} ]]; then
+                do_install ${dir} pip
+            fi
+        elif [[ -n ${PYTHON3} ]]; then
+            if [[ -z ${skip[python3]} ]]; then
+                do_install ${dir} pip "$VENV3DIR/bin/"
+            fi
+        fi
+    done
+    do_install services/api
+    for g in "${gostuff[@]}"
+    do
+        do_install "$g" go
+    done
+    do_install apps/workbench
+}
+
+test_all() {
+    stop_services
+    do_test services/api
+
+    # Shortcut for when we're only running apiserver tests. This saves a bit of time,
+    # because we don't need to start up the api server for subsequent tests.
+    if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
+        rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
+        exit_cleanly
+    fi
+
+    do_test doc
+    do_test sdk/ruby
+    do_test sdk/R
+    do_test sdk/cli
+    do_test services/login-sync
+    do_test services/nodemanager_integration
+    for p in "${pythonstuff[@]}"
+    do
+        dir=${p%:py3}
+        if [[ ${dir} = ${p} ]]; then
+            if [[ -z ${skip[python2]} ]]; then
+                do_test ${dir} pip
+            fi
+        elif [[ -n ${PYTHON3} ]]; then
+            if [[ -z ${skip[python3]} ]]; then
+                do_test ${dir} pip "$VENV3DIR/bin/"
+            fi
+        fi
+    done
+
+    for g in "${gostuff[@]}"
+    do
+        do_test "$g" go
+    done
+    do_test apps/workbench_units
+    do_test apps/workbench_functionals
+    do_test apps/workbench_integration
+    do_test apps/workbench_benchmark
+    do_test apps/workbench_profile
+}
+
+help_interactive() {
+    echo "== Interactive commands:"
+    echo "TARGET                 (short for 'test DIR')"
+    echo "test TARGET"
+    echo "test TARGET:py3        (test with python3)"
+    echo "test TARGET -check.vv  (pass arguments to test)"
+    echo "install TARGET"
+    echo "install env            (go/python libs)"
+    echo "install deps           (go/python libs + arvados components needed for integration tests)"
+    echo "reset                  (...services used by integration tests)"
+    echo "exit"
+    echo "== Test targets:"
+    echo "${!testfuncargs[@]}" | tr ' ' '\n' | sort | column
+}
+
+initialize
+
+declare -A testfuncargs=()
+for g in "${gostuff[@]}"; do
+    testfuncargs[$g]="$g go"
+done
+for p in "${pythonstuff[@]}"; do
+    dir=${p%:py3}
+    if [[ ${dir} = ${p} ]]; then
+        testfuncargs[$p]="$dir pip $VENVDIR/bin/"
+    else
+        testfuncargs[$p]="$dir pip $VENV3DIR/bin/"
+    fi
+done
+
+if [[ -z ${interactive} ]]; then
+    install_all
+    test_all
+else
+    skip=()
+    only=()
+    only_install=()
+    if [[ -e "$VENVDIR/bin/activate" ]]; then stop_services; fi
+    setnextcmd() {
+        if [[ "$nextcmd" != "install deps" ]]; then
+            :
+        elif [[ -e "$VENVDIR/bin/activate" ]]; then
+            nextcmd="test lib/cmd"
+        else
+            nextcmd="install deps"
+        fi
+    }
+    echo
+    help_interactive
+    nextcmd="install deps"
+    setnextcmd
+    while read -p 'What next? ' -e -i "${nextcmd}" nextcmd; do
+        read verb target opts <<<"${nextcmd}"
+        case "${verb}" in
+            "" | "help")
+                help_interactive
+                ;;
+            "exit" | "quit")
+                exit_cleanly
+                ;;
+            "reset")
+                stop_services
+                ;;
+            *)
+                target="${target%/}"
+                testargs["$target"]="${opts}"
+                case "$target" in
+                    all | deps)
+                        ${verb}_${target}
+                        ;;
+                    *)
+                        tt="${testfuncargs[${target}]}"
+                        tt="${tt:-$target}"
+                        do_$verb $tt
+                        ;;
+                esac
+                ;;
+        esac
+        if [[ ${#successes[@]} -gt 0 || ${#failures[@]} -gt 0 ]]; then
+            report_outcomes
+            successes=()
+            failures=()
+        fi
+        cd "$WORKSPACE"
+        setnextcmd
+    done
+    echo
+fi
 exit_cleanly
diff --git a/doc/user/cwl/bwa-mem/bwa-mem-input-mixed.yml b/doc/user/cwl/bwa-mem/bwa-mem-input-mixed.yml
new file mode 100755 (executable)
index 0000000..73bd9f5
--- /dev/null
@@ -0,0 +1,29 @@
+#!/usr/bin/env cwl-runner
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: CC-BY-SA-3.0
+
+# Example input file providing both content addresses and UUIDs.  The
+# collections identified by 'collectionUUID' will be checked that the
+# current content of the collection record matches the content address
+# in the 'location' field.
+
+$namespaces:
+  arv: 'http://arvados.org/cwl#'
+
+cwl:tool: bwa-mem.cwl
+reference:
+  class: File
+  location: keep:2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt
+  arv:collectionUUID: qr1hi-4zz18-pwid4w22a40jp8l
+read_p1:
+  class: File
+  location: keep:ae480c5099b81e17267b7445e35b4bc7+180/HWI-ST1027_129_D0THKACXX.1_1.fastq
+  arv:collectionUUID: qr1hi-4zz18-h615rgfmqt3wje0
+read_p2:
+  class: File
+  location: keep:ae480c5099b81e17267b7445e35b4bc7+180/HWI-ST1027_129_D0THKACXX.1_2.fastq
+  arv:collectionUUID: qr1hi-4zz18-h615rgfmqt3wje0
+group_id: arvados_tutorial
+sample_id: HWI-ST1027_129
+PL: illumina
diff --git a/doc/user/cwl/bwa-mem/bwa-mem-input-uuids.yml b/doc/user/cwl/bwa-mem/bwa-mem-input-uuids.yml
new file mode 100755 (executable)
index 0000000..7e71e95
--- /dev/null
@@ -0,0 +1,21 @@
+#!/usr/bin/env cwl-runner
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: CC-BY-SA-3.0
+
+# Example input file using UUIDs to reference input collections. These
+# will be resolved to content addresses before running the workflow.
+
+cwl:tool: bwa-mem.cwl
+reference:
+  class: File
+  location: keep:qr1hi-4zz18-pwid4w22a40jp8l/19.fasta.bwt
+read_p1:
+  class: File
+  location: keep:qr1hi-4zz18-h615rgfmqt3wje0/HWI-ST1027_129_D0THKACXX.1_1.fastq
+read_p2:
+  class: File
+  location: keep:qr1hi-4zz18-h615rgfmqt3wje0/HWI-ST1027_129_D0THKACXX.1_2.fastq
+group_id: arvados_tutorial
+sample_id: HWI-ST1027_129
+PL: illumina
index ad5d3bd83643e6d9134dbfddddfdf2209be66140..fbce8e17beb7a874689f3de265394a73dcf8c2cb 100644 (file)
@@ -69,7 +69,7 @@ arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107,
 2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
 {
     "aligned_sam": {
-        "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
+        "location": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
         "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
         "class": "File",
         "size": 30738986
@@ -82,7 +82,7 @@ h3. Referencing files
 
 When running a workflow on an Arvados cluster, the input files must be stored in Keep.  There are several ways this can happen.
 
-A URI reference to Keep uses the @keep:@ scheme followed by the portable data hash, collection size, and path to the file inside the collection.  For example, @keep:2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt@.
+A URI reference to Keep uses the @keep:@ scheme followed by either the portable data hash or UUID of the collection and then the location of the file inside the collection.  For example, @keep:2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt@ or @keep:zzzzz-4zz18-zzzzzzzzzzzzzzz/19.fasta.bwt@.
 
 If you reference a file in "arv-mount":{{site.baseurl}}/user/tutorials/tutorial-keep-mount.html, such as @/home/example/keep/by_id/2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt@, then @arvados-cwl-runner@ will automatically determine the appropriate Keep URI reference.
 
@@ -100,7 +100,7 @@ arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107,
 2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
 {
     "aligned_sam": {
-        "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
+        "location": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
         "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
         "class": "File",
         "size": 30738986
index bbe47625a893d6874d2c3c415952948f290de74f..4e807a12ab0cb55c27e7f7e3319136c904644f2a 100644 (file)
@@ -5,6 +5,7 @@
 package container
 
 import (
+       "errors"
        "io"
        "sync"
        "time"
@@ -398,32 +399,62 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
        }
        apply(avail)
 
-       var missing []string
+       missing := map[string]bool{}
        cq.mtx.Lock()
        for uuid, ent := range cq.current {
                if next[uuid] == nil &&
                        ent.Container.State != arvados.ContainerStateCancelled &&
                        ent.Container.State != arvados.ContainerStateComplete {
-                       missing = append(missing, uuid)
+                       missing[uuid] = true
                }
        }
        cq.mtx.Unlock()
 
-       for i, page := 0, 20; i < len(missing); i += page {
-               batch := missing[i:]
-               if len(batch) > page {
-                       batch = batch[:page]
+       for len(missing) > 0 {
+               var batch []string
+               for uuid := range missing {
+                       batch = append(batch, uuid)
+                       if len(batch) == 20 {
+                               break
+                       }
                }
+               filters := []arvados.Filter{{"uuid", "in", batch}}
                ended, err := cq.fetchAll(arvados.ResourceListParams{
                        Select:  selectParam,
                        Order:   "uuid",
                        Count:   "none",
-                       Filters: []arvados.Filter{{"uuid", "in", batch}},
+                       Filters: filters,
                })
                if err != nil {
                        return nil, err
                }
                apply(ended)
+               if len(ended) == 0 {
+                       // This is the only case where we can conclude
+                       // a container has been deleted from the
+                       // database. A short (but non-zero) page, on
+                       // the other hand, can be caused by a response
+                       // size limit.
+                       for _, uuid := range batch {
+                               cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
+                               delete(missing, uuid)
+                               cq.mtx.Lock()
+                               cq.delEnt(uuid, cq.current[uuid].Container.State)
+                               cq.mtx.Unlock()
+                       }
+                       continue
+               }
+               for _, ctr := range ended {
+                       if _, ok := missing[ctr.UUID]; !ok {
+                               msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
+                               cq.logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": ctr.UUID,
+                                       "Filters":       filters,
+                               }).Error(msg)
+                               return nil, errors.New(msg)
+                       }
+                       delete(missing, ctr.UUID)
+               }
        }
        return next, nil
 }
index 36b06020748f43f5f4c7bbdefb5302935dedb861..7268f106a9f36ba933da51ecba4465ba760a8820 100644 (file)
@@ -62,6 +62,8 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        ProbeInterval:      arvados.Duration(5 * time.Millisecond),
                        StaleLockTimeout:   arvados.Duration(5 * time.Millisecond),
                        MaxProbesPerSecond: 1000,
+                       TimeoutSignal:      arvados.Duration(3 * time.Millisecond),
+                       TimeoutTERM:        arvados.Duration(20 * time.Millisecond),
                },
                InstanceTypes: arvados.InstanceTypeMap{
                        test.InstanceType(1).Name:  test.InstanceType(1),
@@ -124,17 +126,20 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        for _, ctr := range queue.Containers {
                waiting[ctr.UUID] = struct{}{}
        }
-       executeContainer := func(ctr arvados.Container) int {
+       finishContainer := func(ctr arvados.Container) {
                mtx.Lock()
                defer mtx.Unlock()
                if _, ok := waiting[ctr.UUID]; !ok {
-                       c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
-                       return 1
+                       c.Errorf("container completed twice: %s", ctr.UUID)
+                       return
                }
                delete(waiting, ctr.UUID)
                if len(waiting) == 0 {
                        close(done)
                }
+       }
+       executeContainer := func(ctr arvados.Container) int {
+               finishContainer(ctr)
                return int(rand.Uint32() & 0x3)
        }
        n := 0
@@ -144,6 +149,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
                stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
                stubvm.ExecuteContainer = executeContainer
+               stubvm.CrashRunningContainer = finishContainer
                switch n % 7 {
                case 0:
                        stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
@@ -261,7 +267,13 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
        c.Check(ok, check.Equals, true)
        <-ch
 
-       sr = getInstances()
+       for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); {
+               sr = getInstances()
+               if len(sr.Items) > 0 {
+                       break
+               }
+               time.Sleep(time.Millisecond)
+       }
        c.Assert(len(sr.Items), check.Equals, 1)
        c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
        c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
index 148b653c2e52305b2ece2255c49d98bf6cb72f50..1f9338f7b8d02e66d73239cc0f0c76ebc53ab423 100644 (file)
@@ -47,7 +47,6 @@ waiting:
                        // Give up.
                        break waiting
                }
-
        }
 
        for _, uuid := range stale {
index 18cdc94fa52156ceab01d7dbe135d8db20029176..307807e32337257f14d02178cbd6e98de61f7be8 100644 (file)
@@ -38,7 +38,7 @@ type WorkerPool interface {
        Create(arvados.InstanceType) bool
        Shutdown(arvados.InstanceType) bool
        StartContainer(arvados.InstanceType, arvados.Container) bool
-       KillContainer(uuid string)
+       KillContainer(uuid, reason string)
        Subscribe() <-chan struct{}
        Unsubscribe(<-chan struct{})
 }
index 4296a1364c911fc94d44af28512ecac195b4e5f5..dab324579dc84e9c8c93086fe693a638f029191f 100644 (file)
@@ -77,7 +77,7 @@ func (p *stubPool) Create(it arvados.InstanceType) bool {
        p.unalloc[it]++
        return true
 }
-func (p *stubPool) KillContainer(uuid string) {
+func (p *stubPool) KillContainer(uuid, reason string) {
        p.Lock()
        defer p.Unlock()
        delete(p.running, uuid)
@@ -335,3 +335,40 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
        }
        c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
 }
+
+func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+       pool := stubPool{
+               unalloc: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 0,
+               },
+               idle: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 0,
+               },
+               running: map[string]time.Time{
+                       test.ContainerUUID(2): time.Time{},
+               },
+       }
+       queue := test.Queue{
+               ChooseType: chooseType,
+               Containers: []arvados.Container{
+                       {
+                               // create a new worker
+                               UUID:     test.ContainerUUID(1),
+                               Priority: 1,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+               },
+       }
+       queue.Update()
+       sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+       c.Check(pool.running, check.HasLen, 1)
+       sch.sync()
+       for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
+       }
+       c.Check(pool.Running(), check.HasLen, 0)
+}
index 23fc621dea26c76be659ddc4f88bea7565f4bd4c..99bee484c6f7162a3e875b7627738a555fb46e13 100644 (file)
@@ -30,11 +30,11 @@ func (sch *Scheduler) sync() {
                switch ent.Container.State {
                case arvados.ContainerStateRunning:
                        if !running {
-                               go sch.cancel(ent, "not running on any worker")
+                               go sch.cancel(uuid, "not running on any worker")
                        } else if !exited.IsZero() && qUpdated.After(exited) {
-                               go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
+                               go sch.cancel(uuid, "state=Running after crunch-run exited")
                        } else if ent.Container.Priority == 0 {
-                               go sch.kill(ent, "priority=0")
+                               go sch.kill(uuid, "priority=0")
                        }
                case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
                        if running {
@@ -46,7 +46,7 @@ func (sch *Scheduler) sync() {
                                // of kill() will be to make the
                                // worker available for the next
                                // container.
-                               go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
                        } else {
                                sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
@@ -60,13 +60,13 @@ func (sch *Scheduler) sync() {
                                // a network outage and is still
                                // preparing to run a container that
                                // has already been unlocked/requeued.
-                               go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
                        }
                case arvados.ContainerStateLocked:
                        if running && !exited.IsZero() && qUpdated.After(exited) {
                                go sch.requeue(ent, "crunch-run exited")
                        } else if running && exited.IsZero() && ent.Container.Priority == 0 {
-                               go sch.kill(ent, "priority=0")
+                               go sch.kill(uuid, "priority=0")
                        } else if !running && ent.Container.Priority == 0 {
                                go sch.requeue(ent, "priority=0")
                        }
@@ -77,10 +77,14 @@ func (sch *Scheduler) sync() {
                        }).Error("BUG: unexpected state")
                }
        }
+       for uuid := range running {
+               if _, known := qEntries[uuid]; !known {
+                       go sch.kill(uuid, "not in queue")
+               }
+       }
 }
 
-func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
-       uuid := ent.Container.UUID
+func (sch *Scheduler) cancel(uuid string, reason string) {
        if !sch.uuidLock(uuid, "cancel") {
                return
        }
@@ -93,11 +97,8 @@ func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
        }
 }
 
-func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
-       uuid := ent.Container.UUID
-       logger := sch.logger.WithField("ContainerUUID", uuid)
-       logger.Debugf("killing crunch-run process because %s", reason)
-       sch.pool.KillContainer(uuid)
+func (sch *Scheduler) kill(uuid string, reason string) {
+       sch.pool.KillContainer(uuid, reason)
 }
 
 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
index e18a2b5362b413a16eeb28984bfabbe34796f6db..6a6c952358bcadff447d08869e641f32d24618ad 100644 (file)
@@ -158,14 +158,21 @@ func (q *Queue) Update() error {
 //
 // The resulting changes are not exposed through Get() or Entries()
 // until the next call to Update().
-func (q *Queue) Notify(upd arvados.Container) {
+//
+// Return value is true unless the update is rejected (invalid state
+// transition).
+func (q *Queue) Notify(upd arvados.Container) bool {
        q.mtx.Lock()
        defer q.mtx.Unlock()
        for i, ctr := range q.Containers {
                if ctr.UUID == upd.UUID {
-                       q.Containers[i] = upd
-                       return
+                       if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+                               q.Containers[i] = upd
+                               return true
+                       }
+                       return false
                }
        }
        q.Containers = append(q.Containers, upd)
+       return true
 }
index 4df39d0c46ac7dc538e5c7d9949cd884df8768b4..a4521eab7bb9074b02a2f06ef50b781971099d3d 100644 (file)
@@ -126,6 +126,8 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
                tags:         copyTags(tags),
                providerType: it.ProviderType,
                initCommand:  cmd,
+               running:      map[string]int64{},
+               killing:      map[string]bool{},
        }
        svm.SSHService = SSHService{
                HostKey:        sis.driver.HostKey,
@@ -177,12 +179,13 @@ func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
 // running (and might change IP addresses, shut down, etc.)  without
 // updating any stubInstances that have been returned to callers.
 type StubVM struct {
-       Boot                 time.Time
-       Broken               time.Time
-       CrunchRunMissing     bool
-       CrunchRunCrashRate   float64
-       CrunchRunDetachDelay time.Duration
-       ExecuteContainer     func(arvados.Container) int
+       Boot                  time.Time
+       Broken                time.Time
+       CrunchRunMissing      bool
+       CrunchRunCrashRate    float64
+       CrunchRunDetachDelay  time.Duration
+       ExecuteContainer      func(arvados.Container) int
+       CrashRunningContainer func(arvados.Container)
 
        sis          *StubInstanceSet
        id           cloud.InstanceID
@@ -190,7 +193,9 @@ type StubVM struct {
        initCommand  cloud.InitCommand
        providerType string
        SSHService   SSHService
-       running      map[string]bool
+       running      map[string]int64
+       killing      map[string]bool
+       lastPID      int64
        sync.Mutex
 }
 
@@ -244,16 +249,16 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                        }
                }
                svm.Lock()
-               if svm.running == nil {
-                       svm.running = map[string]bool{}
-               }
-               svm.running[uuid] = true
+               svm.lastPID++
+               pid := svm.lastPID
+               svm.running[uuid] = pid
                svm.Unlock()
                time.Sleep(svm.CrunchRunDetachDelay)
                fmt.Fprintf(stderr, "starting %s\n", uuid)
                logger := svm.sis.logger.WithFields(logrus.Fields{
                        "Instance":      svm.id,
                        "ContainerUUID": uuid,
+                       "PID":           pid,
                })
                logger.Printf("[test] starting crunch-run stub")
                go func() {
@@ -263,37 +268,43 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                                logger.Print("[test] container not in queue")
                                return
                        }
+
+                       defer func() {
+                               if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil {
+                                       svm.CrashRunningContainer(ctr)
+                               }
+                       }()
+
                        if crashluck > svm.CrunchRunCrashRate/2 {
                                time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
                                ctr.State = arvados.ContainerStateRunning
-                               queue.Notify(ctr)
+                               if !queue.Notify(ctr) {
+                                       ctr, _ = queue.Get(uuid)
+                                       logger.Print("[test] erroring out because state=Running update was rejected")
+                                       return
+                               }
                        }
 
                        time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+
                        svm.Lock()
-                       _, running := svm.running[uuid]
-                       svm.Unlock()
-                       if !running {
+                       defer svm.Unlock()
+                       if svm.running[uuid] != pid {
                                logger.Print("[test] container was killed")
                                return
                        }
-                       if svm.ExecuteContainer != nil {
-                               ctr.ExitCode = svm.ExecuteContainer(ctr)
-                       }
-                       // TODO: Check whether the stub instance has
-                       // been destroyed, and if so, don't call
-                       // queue.Notify. Then "container finished
-                       // twice" can be classified as a bug.
+                       delete(svm.running, uuid)
+
                        if crashluck < svm.CrunchRunCrashRate {
-                               logger.Print("[test] crashing crunch-run stub")
+                               logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
                        } else {
+                               if svm.ExecuteContainer != nil {
+                                       ctr.ExitCode = svm.ExecuteContainer(ctr)
+                               }
+                               logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub")
                                ctr.State = arvados.ContainerStateComplete
-                               queue.Notify(ctr)
+                               go queue.Notify(ctr)
                        }
-                       logger.Print("[test] exiting crunch-run stub")
-                       svm.Lock()
-                       defer svm.Unlock()
-                       delete(svm.running, uuid)
                }()
                return 0
        }
@@ -307,13 +318,34 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
        }
        if strings.HasPrefix(command, "crunch-run --kill ") {
                svm.Lock()
-               defer svm.Unlock()
-               if svm.running[uuid] {
-                       delete(svm.running, uuid)
+               pid, running := svm.running[uuid]
+               if running && !svm.killing[uuid] {
+                       svm.killing[uuid] = true
+                       go func() {
+                               time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond)
+                               svm.Lock()
+                               defer svm.Unlock()
+                               if svm.running[uuid] == pid {
+                                       // Kill only if the running entry
+                                       // hasn't since been killed and
+                                       // replaced with a different one.
+                                       delete(svm.running, uuid)
+                               }
+                               delete(svm.killing, uuid)
+                       }()
+                       svm.Unlock()
+                       time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
+                       svm.Lock()
+                       _, running = svm.running[uuid]
+               }
+               svm.Unlock()
+               if running {
+                       fmt.Fprintf(stderr, "%s: container is running\n", uuid)
+                       return 1
                } else {
                        fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+                       return 0
                }
-               return 0
        }
        if command == "true" {
                return 0
index e81c2c091f1c37c7b52488b4d919bdb9a9fe4d79..81a658535ea593a7a0a0c2d9231fd66f3055bdbd 100644 (file)
@@ -68,6 +68,8 @@ const (
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
        defaultTimeoutShutdown    = time.Second * 10
+       defaultTimeoutTERM        = time.Minute * 2
+       defaultTimeoutSignal      = time.Second * 5
 
        // Time after a quota error to try again anyway, even if no
        // instances have been shutdown.
@@ -105,6 +107,8 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
                timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               timeoutTERM:        duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
+               timeoutSignal:      duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
                installPublicKey:   installPublicKey,
                stop:               make(chan bool),
        }
@@ -136,6 +140,8 @@ type Pool struct {
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
+       timeoutTERM        time.Duration
+       timeoutSignal      time.Duration
        installPublicKey   ssh.PublicKey
 
        // private state
@@ -319,9 +325,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
        if !ok {
                return errors.New("requested instance does not exist")
        }
-       wkr.idleBehavior = idleBehavior
-       wkr.saveTags()
-       wkr.shutdownIfIdle()
+       wkr.setIdleBehavior(idleBehavior)
        return nil
 }
 
@@ -383,19 +387,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor
                probed:       now,
                busy:         now,
                updated:      now,
-               running:      make(map[string]struct{}),
-               starting:     make(map[string]struct{}),
+               running:      make(map[string]*remoteRunner),
+               starting:     make(map[string]*remoteRunner),
                probing:      make(chan struct{}, 1),
        }
        wp.workers[id] = wkr
        return wkr, true
 }
 
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
-       wp.exited[uuid] = t
-}
-
 // Shutdown shuts down a worker with the given type, or returns false
 // if all workers with the given type are busy.
 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
@@ -419,8 +418,12 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
 }
 
 // CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
 func (wp *Pool) CountWorkers() map[State]int {
        wp.setupOnce.Do(wp.setup)
+       wp.waitUntilLoaded()
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        r := map[State]int{}
@@ -482,53 +485,29 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Reason":        reason,
+       })
        if _, ok := wp.exited[uuid]; ok {
-               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               logger.Debug("clearing placeholder for exited crunch-run process")
                delete(wp.exited, uuid)
                return
        }
        for _, wkr := range wp.workers {
-               if _, ok := wkr.running[uuid]; ok {
-                       go wp.kill(wkr, uuid)
-                       return
+               rr := wkr.running[uuid]
+               if rr == nil {
+                       rr = wkr.starting[uuid]
                }
-       }
-       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "ContainerUUID": uuid,
-               "Instance":      wkr.instance.ID(),
-       })
-       logger.Debug("killing process")
-       cmd := "crunch-run --kill 15 " + uuid
-       if u := wkr.instance.RemoteUser(); u != "root" {
-               cmd = "sudo " + cmd
-       }
-       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
-       if err != nil {
-               logger.WithFields(logrus.Fields{
-                       "stderr": string(stderr),
-                       "stdout": string(stdout),
-                       "error":  err,
-               }).Warn("kill failed")
-               return
-       }
-       logger.Debug("killing process succeeded")
-       wp.mtx.Lock()
-       defer wp.mtx.Unlock()
-       if _, ok := wkr.running[uuid]; ok {
-               delete(wkr.running, uuid)
-               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
-                       wkr.state = StateIdle
+               if rr != nil {
+                       rr.Kill(reason)
+                       return
                }
-               wkr.updated = time.Now()
-               go wp.notify()
        }
+       logger.Debug("cannot kill: already disappeared")
 }
 
 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
@@ -781,11 +760,12 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                })
                logger.Info("instance disappeared in cloud")
                delete(wp.workers, id)
-               go wkr.executor.Close()
+               go wkr.Close()
                notify = true
        }
 
        if !wp.loaded {
+               notify = true
                wp.loaded = true
                wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
        }
@@ -795,6 +775,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        }
 }
 
+func (wp *Pool) waitUntilLoaded() {
+       ch := wp.Subscribe()
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       for !wp.loaded {
+               wp.mtx.RUnlock()
+               <-ch
+               wp.mtx.RLock()
+       }
+}
+
 // Return a random string of n hexadecimal digits (n*4 random bits). n
 // must be even.
 func randomHex(n int) string {
diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go
new file mode 100644 (file)
index 0000000..c30ff9f
--- /dev/null
@@ -0,0 +1,154 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "syscall"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
+)
+
+// remoteRunner handles the starting and stopping of a crunch-run
+// process on a remote machine.
+type remoteRunner struct {
+       uuid          string
+       executor      Executor
+       arvClient     *arvados.Client
+       remoteUser    string
+       timeoutTERM   time.Duration
+       timeoutSignal time.Duration
+       onUnkillable  func(uuid string) // callback invoked when giving up on SIGTERM
+       onKilled      func(uuid string) // callback invoked when process exits after SIGTERM
+       logger        logrus.FieldLogger
+
+       stopping bool          // true if Stop() has been called
+       givenup  bool          // true if timeoutTERM has been reached
+       closed   chan struct{} // channel is closed if Close() has been called
+}
+
+// newRemoteRunner returns a new remoteRunner. Caller should ensure
+// Close() is called to release resources.
+func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+       rr := &remoteRunner{
+               uuid:          uuid,
+               executor:      wkr.executor,
+               arvClient:     wkr.wp.arvClient,
+               remoteUser:    wkr.instance.RemoteUser(),
+               timeoutTERM:   wkr.wp.timeoutTERM,
+               timeoutSignal: wkr.wp.timeoutSignal,
+               onUnkillable:  wkr.onUnkillable,
+               onKilled:      wkr.onKilled,
+               logger:        wkr.logger.WithField("ContainerUUID", uuid),
+               closed:        make(chan struct{}),
+       }
+       return rr
+}
+
+// Start a crunch-run process on the remote host.
+//
+// Start does not return any error encountered. The caller should
+// assume the remote process _might_ have started, at least until it
+// probes the worker and finds otherwise.
+func (rr *remoteRunner) Start() {
+       env := map[string]string{
+               "ARVADOS_API_HOST":  rr.arvClient.APIHost,
+               "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
+       }
+       if rr.arvClient.Insecure {
+               env["ARVADOS_API_HOST_INSECURE"] = "1"
+       }
+       envJSON, err := json.Marshal(env)
+       if err != nil {
+               panic(err)
+       }
+       stdin := bytes.NewBuffer(envJSON)
+       cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+       if rr.remoteUser != "root" {
+               cmd = "sudo " + cmd
+       }
+       stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
+       if err != nil {
+               rr.logger.WithField("stdout", string(stdout)).
+                       WithField("stderr", string(stderr)).
+                       WithError(err).
+                       Error("error starting crunch-run process")
+               return
+       }
+       rr.logger.Info("crunch-run process started")
+}
+
+// Close abandons the remote process (if any) and releases
+// resources. Close must not be called more than once.
+func (rr *remoteRunner) Close() {
+       close(rr.closed)
+}
+
+// Kill starts a background task to kill the remote process, first
+// trying SIGTERM until reaching timeoutTERM, then calling
+// onUnkillable().
+//
+// SIGKILL is not used. It would merely kill the crunch-run supervisor
+// and thereby make the docker container, arv-mount, etc. invisible to
+// us without actually stopping them.
+//
+// Once Kill has been called, calling it again has no effect.
+func (rr *remoteRunner) Kill(reason string) {
+       if rr.stopping {
+               return
+       }
+       rr.stopping = true
+       rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
+       go func() {
+               termDeadline := time.Now().Add(rr.timeoutTERM)
+               t := time.NewTicker(rr.timeoutSignal)
+               defer t.Stop()
+               for range t.C {
+                       switch {
+                       case rr.isClosed():
+                               return
+                       case time.Now().After(termDeadline):
+                               rr.logger.Debug("giving up")
+                               rr.givenup = true
+                               rr.onUnkillable(rr.uuid)
+                               return
+                       default:
+                               rr.kill(syscall.SIGTERM)
+                       }
+               }
+       }()
+}
+
+func (rr *remoteRunner) kill(sig syscall.Signal) {
+       logger := rr.logger.WithField("Signal", int(sig))
+       logger.Info("sending signal")
+       cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+       if rr.remoteUser != "root" {
+               cmd = "sudo " + cmd
+       }
+       stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
+       if err != nil {
+               logger.WithFields(logrus.Fields{
+                       "stderr": string(stderr),
+                       "stdout": string(stdout),
+                       "error":  err,
+               }).Info("kill attempt unsuccessful")
+               return
+       }
+       rr.onKilled(rr.uuid)
+}
+
+func (rr *remoteRunner) isClosed() bool {
+       select {
+       case <-rr.closed:
+               return true
+       default:
+               return false
+       }
+}
index 64e1f7797af8634be63502faea5faaaa8b30a5f9..41117c1d4eafb5aa2a92c163d3f79d72ace443d3 100644 (file)
@@ -6,7 +6,6 @@ package worker
 
 import (
        "bytes"
-       "encoding/json"
        "fmt"
        "strings"
        "sync"
@@ -87,62 +86,60 @@ type worker struct {
        busy         time.Time
        destroyed    time.Time
        lastUUID     string
-       running      map[string]struct{} // remember to update state idle<->running when this changes
-       starting     map[string]struct{} // remember to update state idle<->running when this changes
+       running      map[string]*remoteRunner // remember to update state idle<->running when this changes
+       starting     map[string]*remoteRunner // remember to update state idle<->running when this changes
        probing      chan struct{}
 }
 
+func (wkr *worker) onUnkillable(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       logger := wkr.logger.WithField("ContainerUUID", uuid)
+       if wkr.idleBehavior == IdleBehaviorHold {
+               logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+               return
+       }
+       logger.Warn("unkillable container, draining worker")
+       wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       wkr.closeRunner(uuid)
+       go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+       wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+       wkr.idleBehavior = idleBehavior
+       wkr.saveTags()
+       wkr.shutdownIfIdle()
+}
+
 // caller must have lock.
 func (wkr *worker) startContainer(ctr arvados.Container) {
        logger := wkr.logger.WithFields(logrus.Fields{
                "ContainerUUID": ctr.UUID,
                "Priority":      ctr.Priority,
        })
-       logger = logger.WithField("Instance", wkr.instance.ID())
        logger.Debug("starting container")
-       wkr.starting[ctr.UUID] = struct{}{}
+       rr := newRemoteRunner(ctr.UUID, wkr)
+       wkr.starting[ctr.UUID] = rr
        if wkr.state != StateRunning {
                wkr.state = StateRunning
                go wkr.wp.notify()
        }
        go func() {
-               env := map[string]string{
-                       "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
-                       "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
-               }
-               if wkr.wp.arvClient.Insecure {
-                       env["ARVADOS_API_HOST_INSECURE"] = "1"
-               }
-               envJSON, err := json.Marshal(env)
-               if err != nil {
-                       panic(err)
-               }
-               stdin := bytes.NewBuffer(envJSON)
-               cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
-               if u := wkr.instance.RemoteUser(); u != "root" {
-                       cmd = "sudo " + cmd
-               }
-               stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
+               rr.Start()
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
                wkr.updated = now
                wkr.busy = now
                delete(wkr.starting, ctr.UUID)
-               wkr.running[ctr.UUID] = struct{}{}
-               wkr.lastUUID = ctr.UUID
-               if err != nil {
-                       logger.WithField("stdout", string(stdout)).
-                               WithField("stderr", string(stderr)).
-                               WithError(err).
-                               Error("error starting crunch-run process")
-                       // Leave uuid in wkr.running, though: it's
-                       // possible the error was just a communication
-                       // failure and the process was in fact
-                       // started.  Wait for next probe to find out.
-                       return
-               }
-               logger.Info("crunch-run process started")
+               wkr.running[ctr.UUID] = rr
                wkr.lastUUID = ctr.UUID
        }()
 }
@@ -274,31 +271,8 @@ func (wkr *worker) probeAndUpdate() {
                // advantage of the non-busy state, though.
                wkr.busy = updateTime
        }
-       changed := false
 
-       // Build a new "running" map. Set changed=true if it differs
-       // from the existing map (wkr.running) to ensure the scheduler
-       // gets notified below.
-       running := map[string]struct{}{}
-       for _, uuid := range ctrUUIDs {
-               running[uuid] = struct{}{}
-               if _, ok := wkr.running[uuid]; !ok {
-                       if _, ok := wkr.starting[uuid]; !ok {
-                               // We didn't start it -- it must have
-                               // been started by a previous
-                               // dispatcher process.
-                               logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
-                       }
-                       changed = true
-               }
-       }
-       for uuid := range wkr.running {
-               if _, ok := running[uuid]; !ok {
-                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
-                       wkr.wp.notifyExited(uuid, updateTime)
-                       changed = true
-               }
-       }
+       changed := wkr.updateRunning(ctrUUIDs)
 
        // Update state if this was the first successful boot-probe.
        if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
@@ -317,14 +291,13 @@ func (wkr *worker) probeAndUpdate() {
 
        // Log whenever a run-probe reveals crunch-run processes
        // appearing/disappearing before boot-probe succeeds.
-       if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+       if wkr.state == StateUnknown && changed {
                logger.WithFields(logrus.Fields{
-                       "RunningContainers": len(running),
+                       "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
                }).Info("crunch-run probe succeeded, but boot probe is still failing")
        }
 
-       wkr.running = running
        if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
                wkr.state = StateRunning
        } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
@@ -333,7 +306,7 @@ func (wkr *worker) probeAndUpdate() {
        wkr.updated = updateTime
        if booted && (initialState == StateUnknown || initialState == StateBooting) {
                logger.WithFields(logrus.Fields{
-                       "RunningContainers": len(running),
+                       "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
                }).Info("probes succeeded, instance is in service")
        }
@@ -402,27 +375,52 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
        return true
 }
 
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
 // caller must have lock.
-func (wkr *worker) shutdownIfIdle() bool {
+func (wkr *worker) eligibleForShutdown() bool {
        if wkr.idleBehavior == IdleBehaviorHold {
-               // Never shut down.
                return false
        }
-       age := time.Since(wkr.busy)
-
-       old := age >= wkr.wp.timeoutIdle
        draining := wkr.idleBehavior == IdleBehaviorDrain
-       shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
-               (draining && wkr.state == StateBooting)
-       if !shouldShutdown {
+       switch wkr.state {
+       case StateBooting:
+               return draining
+       case StateIdle:
+               return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+       case StateRunning:
+               if !draining {
+                       return false
+               }
+               for _, rr := range wkr.running {
+                       if !rr.givenup {
+                               return false
+                       }
+               }
+               for _, rr := range wkr.starting {
+                       if !rr.givenup {
+                               return false
+                       }
+               }
+               // draining, and all remaining runners are just trying
+               // to force-kill their crunch-run procs
+               return true
+       default:
                return false
        }
+}
 
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+       if !wkr.eligibleForShutdown() {
+               return false
+       }
        wkr.logger.WithFields(logrus.Fields{
                "State":        wkr.state,
-               "IdleDuration": stats.Duration(age),
+               "IdleDuration": stats.Duration(time.Since(wkr.busy)),
                "IdleBehavior": wkr.idleBehavior,
-       }).Info("shutdown idle worker")
+       }).Info("shutdown worker")
        wkr.shutdown()
        return true
 }
@@ -468,3 +466,68 @@ func (wkr *worker) saveTags() {
                }()
        }
 }
+
+func (wkr *worker) Close() {
+       // This might take time, so do it after unlocking mtx.
+       defer wkr.executor.Close()
+
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       for uuid, rr := range wkr.running {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+       for uuid, rr := range wkr.starting {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+       alive := map[string]bool{}
+       for _, uuid := range ctrUUIDs {
+               alive[uuid] = true
+               if _, ok := wkr.running[uuid]; ok {
+                       // unchanged
+               } else if rr, ok := wkr.starting[uuid]; ok {
+                       wkr.running[uuid] = rr
+                       delete(wkr.starting, uuid)
+                       changed = true
+               } else {
+                       // We didn't start it -- it must have been
+                       // started by a previous dispatcher process.
+                       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+                       wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+                       changed = true
+               }
+       }
+       for uuid := range wkr.running {
+               if !alive[uuid] {
+                       wkr.closeRunner(uuid)
+                       changed = true
+               }
+       }
+       return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+       rr := wkr.running[uuid]
+       if rr == nil {
+               return
+       }
+       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+       delete(wkr.running, uuid)
+       rr.Close()
+
+       now := time.Now()
+       wkr.updated = now
+       wkr.wp.exited[uuid] = now
+       if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+               wkr.state = StateIdle
+       }
+}
index 3bc33b62c9fee896f107278a564b859d1448366e..15a2a894c5bceb89bdae4f6c5e4146d317dca083 100644 (file)
@@ -209,12 +209,17 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        busy:     ctime,
                        probed:   ctime,
                        updated:  ctime,
+                       running:  map[string]*remoteRunner{},
+                       starting: map[string]*remoteRunner{},
+                       probing:  make(chan struct{}, 1),
                }
                if trial.running > 0 {
-                       wkr.running = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+                       uuid := "zzzzz-dz642-abcdefghijklmno"
+                       wkr.running = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
                }
                if trial.starting > 0 {
-                       wkr.starting = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+                       uuid := "zzzzz-dz642-bcdefghijklmnop"
+                       wkr.starting = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
                }
                wkr.probeAndUpdate()
                c.Check(wkr.state, check.Equals, trial.expectState)
index c7e20e2a72947d2e74f147e6a6c0fd68d14254f8..60aeb1892b11e13c980c8c2a97e10da9fbc7a639 100644 (file)
@@ -33,8 +33,8 @@ Gem::Specification.new do |s|
   s.add_runtime_dependency 'arvados', '~> 1.3.0', '>= 1.3.0'
   # Our google-api-client dependency used to be < 0.9, but that could be
   # satisfied by the buggy 0.9.pre*.  https://dev.arvados.org/issues/9213
-  s.add_runtime_dependency 'cure-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
-  s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5'
+  s.add_runtime_dependency 'arvados-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
+  s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5.1'
   s.add_runtime_dependency 'json', '>= 1.7.7', '<3'
   s.add_runtime_dependency 'optimist', '~> 3.0'
   s.add_runtime_dependency 'andand', '~> 1.3', '>= 1.3.3'
index 834ca195fdda02d859eafcd4aa0cea5c70c1c359..95711762c9a421a94c3581b165d9dbd6522a99e6 100644 (file)
@@ -293,7 +293,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
         logger.exception("Error creating the Arvados CWL Executor")
         return 1
 
-    # Note that unless in debug mode, some stack traces related to user 
+    # Note that unless in debug mode, some stack traces related to user
     # workflow errors may be suppressed. See ArvadosJob.done().
     if arvargs.debug:
         logger.setLevel(logging.DEBUG)
index 03b4e07c76f5849a97ae85b9bd179e897ec8fc33..b194f3dfcaa0be1cf676ea771e008cbcc9b0b2c0 100644 (file)
@@ -123,6 +123,8 @@ class ArvadosContainer(JobBase):
                 "kind": "collection",
                 "portable_data_hash": pdh
             }
+            if pdh in self.pathmapper.pdh_to_uuid:
+                mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
             if len(sp) == 2:
                 if tp == "Directory":
                     path = sp[1]
@@ -329,8 +331,8 @@ class ArvadosContainer(JobBase):
             else:
                 processStatus = "permanentFail"
 
-            if processStatus == "permanentFail":
-                logc = arvados.collection.CollectionReader(container["log"],
+            if processStatus == "permanentFail" and record["log_uuid"]:
+                logc = arvados.collection.CollectionReader(record["log_uuid"],
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
@@ -353,8 +355,8 @@ class ArvadosContainer(JobBase):
             if container["output"]:
                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
         except WorkflowException as e:
-            # Only include a stack trace if in debug mode. 
-            # A stack trace may obfuscate more useful output about the workflow. 
+            # Only include a stack trace if in debug mode.
+            # A stack trace may obfuscate more useful output about the workflow.
             logger.error("%s unable to collect output from %s:\n%s",
                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
             processStatus = "permanentFail"
index 319e8a887114b88b55865ca673dbafb3e0b9a7dc..c35842616696f7dc00d047969ea74f27aad332df 100644 (file)
@@ -87,7 +87,7 @@ class RuntimeStatusLoggingHandler(logging.Handler):
                     )
             finally:
                 self.updatingRuntimeStatus = False
-            
+
 
 class ArvCwlExecutor(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
@@ -475,7 +475,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
         with final.open("cwl.output.json", "w") as f:
             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
-            f.write(res)           
+            f.write(res)
 
         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
 
index 3744b4a93afa40df10c17335310503500acd2432..252ca57d47bb30ad1834e7070b3cfdf1e0ffdbb1 100644 (file)
@@ -63,24 +63,27 @@ class CollectionCache(object):
             del self.collections[pdh]
             self.total -= v[1]
 
-    def get(self, pdh):
+    def get(self, locator):
         with self.lock:
-            if pdh not in self.collections:
-                m = pdh_size.match(pdh)
+            if locator not in self.collections:
+                m = pdh_size.match(locator)
                 if m:
                     self.cap_cache(int(m.group(2)) * 128)
-                logger.debug("Creating collection reader for %s", pdh)
-                cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
-                                                         keep_client=self.keep_client,
-                                                         num_retries=self.num_retries)
+                logger.debug("Creating collection reader for %s", locator)
+                try:
+                    cr = arvados.collection.CollectionReader(locator, api_client=self.api_client,
+                                                             keep_client=self.keep_client,
+                                                             num_retries=self.num_retries)
+                except arvados.errors.ApiError as ap:
+                    raise IOError(errno.ENOENT, "Could not access collection '%s': %s" % (locator, str(ap._get_reason())))
                 sz = len(cr.manifest_text()) * 128
-                self.collections[pdh] = (cr, sz)
+                self.collections[locator] = (cr, sz)
                 self.total += sz
             else:
-                cr, sz = self.collections[pdh]
+                cr, sz = self.collections[locator]
                 # bump it to the back
-                del self.collections[pdh]
-                self.collections[pdh] = (cr, sz)
+                del self.collections[locator]
+                self.collections[locator] = (cr, sz)
             return cr
 
 
@@ -94,9 +97,10 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
     def get_collection(self, path):
         sp = path.split("/", 1)
         p = sp[0]
-        if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
-            pdh = p[5:]
-            return (self.collection_cache.get(pdh), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
+        if p.startswith("keep:") and (arvados.util.keep_locator_pattern.match(p[5:]) or
+                                      arvados.util.collection_uuid_pattern.match(p[5:])):
+            locator = p[5:]
+            return (self.collection_cache.get(locator), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
         else:
             return (None, path)
 
@@ -261,9 +265,11 @@ class CollectionFetcher(DefaultFetcher):
             baseparts = basesp.path.split("/")
             urlparts = urlsp.path.split("/") if urlsp.path else []
 
-            pdh = baseparts.pop(0)
+            locator = baseparts.pop(0)
 
-            if basesp.scheme == "keep" and not arvados.util.keep_locator_pattern.match(pdh):
+            if (basesp.scheme == "keep" and
+                (not arvados.util.keep_locator_pattern.match(locator)) and
+                (not arvados.util.collection_uuid_pattern.match(locator))):
                 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
 
             if urlsp.path.startswith("/"):
@@ -273,7 +279,7 @@ class CollectionFetcher(DefaultFetcher):
             if baseparts and urlsp.path:
                 baseparts.pop()
 
-            path = "/".join([pdh] + baseparts + urlparts)
+            path = "/".join([locator] + baseparts + urlparts)
             return urllib.parse.urlunsplit((basesp.scheme, "", path, "", urlsp.fragment))
 
         return super(CollectionFetcher, self).urljoin(base_url, url)
index e0445febdc9a0731314607417739747c8f0e632c..38135899dca7c66a10a135c85d2c0f8db43b0f2a 100644 (file)
@@ -58,6 +58,7 @@ class ArvPathMapper(PathMapper):
         self.name = name
         self.referenced_files = [r["location"] for r in referenced_files]
         self.single_collection = single_collection
+        self.pdh_to_uuid = {}
         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
 
     def visit(self, srcobj, uploadfiles):
@@ -67,6 +68,8 @@ class ArvPathMapper(PathMapper):
 
         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+            if arvados_cwl.util.collectionUUID in srcobj:
+                self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
 
         debug = logger.isEnabledFor(logging.DEBUG)
 
index e515ac2ce5e99f4ec75011b8ac51bfe2fc1bbff8..9385bde63c4aa60b55de22d7c8b87908039444e3 100644 (file)
@@ -8,6 +8,7 @@ from future.utils import  viewvalues, viewitems
 
 import os
 import sys
+import re
 import urllib.parse
 from functools import partial
 import logging
@@ -30,8 +31,10 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.utils import aslist
 from cwltool.builder import substitute
 from cwltool.pack import pack
+import schema_salad.validate as validate
 
 import arvados.collection
+from .util import collectionUUID
 import ruamel.yaml as yaml
 
 import arvados_cwl.arvdocker
@@ -87,6 +90,8 @@ def discover_secondary_files(inputs, job_order, discovered=None):
         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
             setSecondary(t, job_order[shortname(t["id"])], discovered)
 
+collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
+collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
 
 def upload_dependencies(arvrunner, name, document_loader,
                         workflowobj, uri, loadref_run,
@@ -136,14 +141,55 @@ def upload_dependencies(arvrunner, name, document_loader,
                   loadref, urljoin=document_loader.fetcher.urljoin)
 
     sc = []
-    def only_real(obj):
-        # Only interested in local files than need to be uploaded,
-        # don't include file literals, keep references, etc.
-        sp = obj.get("location", "").split(":")
-        if len(sp) > 1 and sp[0] in ("file", "http", "https"):
+    uuids = {}
+
+    def collect_uuids(obj):
+        loc = obj.get("location", "")
+        sp = loc.split(":")
+        if sp[0] == "keep":
+            # Collect collection uuids that need to be resolved to
+            # portable data hashes
+            gp = collection_uuid_pattern.match(loc)
+            if gp:
+                uuids[gp.groups()[0]] = obj
+            if collectionUUID in obj:
+                uuids[obj[collectionUUID]] = obj
+
+    def collect_uploads(obj):
+        loc = obj.get("location", "")
+        sp = loc.split(":")
+        if len(sp) < 1:
+            return
+        if sp[0] in ("file", "http", "https"):
+            # Record local files than need to be uploaded,
+            # don't include file literals, keep references, etc.
             sc.append(obj)
+        collect_uuids(obj)
 
-    visit_class(sc_result, ("File", "Directory"), only_real)
+    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+    visit_class(sc_result, ("File", "Directory"), collect_uploads)
+
+    # Resolve any collection uuids we found to portable data hashes
+    # and assign them to uuid_map
+    uuid_map = {}
+    fetch_uuids = list(uuids.keys())
+    while fetch_uuids:
+        # For a large number of fetch_uuids, API server may limit
+        # response size, so keep fetching from API server has nothing
+        # more to give us.
+        lookups = arvrunner.api.collections().list(
+            filters=[["uuid", "in", fetch_uuids]],
+            count="none",
+            select=["uuid", "portable_data_hash"]).execute(
+                num_retries=arvrunner.num_retries)
+
+        if not lookups["items"]:
+            break
+
+        for l in lookups["items"]:
+            uuid_map[l["uuid"]] = l["portable_data_hash"]
+
+        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
@@ -194,8 +240,37 @@ def upload_dependencies(arvrunner, name, document_loader,
                            single_collection=True)
 
     def setloc(p):
-        if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+        loc = p.get("location")
+        if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
+            return
+
+        if not loc:
+            return
+
+        if collectionUUID in p:
+            uuid = p[collectionUUID]
+            if uuid not in uuid_map:
+                raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+                    "Collection uuid %s not found" % uuid)
+            gp = collection_pdh_pattern.match(loc)
+            if gp and uuid_map[uuid] != gp.groups()[0]:
+                # This file entry has both collectionUUID and a PDH
+                # location. If the PDH doesn't match the one returned
+                # the API server, raise an error.
+                raise SourceLine(p, "location", validate.ValidationException).makeError(
+                    "Expected collection uuid %s to be %s but API server reported %s" % (
+                        uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
+        gp = collection_uuid_pattern.match(loc)
+        if not gp:
+            return
+        uuid = gp.groups()[0]
+        if uuid not in uuid_map:
+            raise SourceLine(p, "location", validate.ValidationException).makeError(
+                "Collection uuid %s not found" % uuid)
+        p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+        p[collectionUUID] = uuid
 
     visit_class(workflowobj, ("File", "Directory"), setloc)
     visit_class(discovered, ("File", "Directory"), setloc)
index 776fc6bc25dae06e232e2546cab501246d6cd6b3..85ae65ecf18c327aed6d9b3f2ddb5182dcc05b08 100644 (file)
@@ -5,6 +5,8 @@
 import datetime
 from arvados.errors import ApiError
 
+collectionUUID =  "http://arvados.org/cwl#collectionUUID"
+
 def get_intermediate_collection_info(workflow_step_name, current_container, intermediate_output_ttl):
         if workflow_step_name:
             name = "Intermediate collection for step %s" % (workflow_step_name)
@@ -30,5 +32,5 @@ def get_current_container(api, num_retries=0, logger=None):
             if logger:
                 logger.info("Getting current container: %s", e)
             raise e
-            
+
     return current_container
diff --git a/sdk/cwl/tests/submit_test_job_with_inconsistent_uuids.json b/sdk/cwl/tests/submit_test_job_with_inconsistent_uuids.json
new file mode 100644 (file)
index 0000000..233a9fc
--- /dev/null
@@ -0,0 +1,25 @@
+{
+    "x": {
+        "class": "File",
+        "path": "input/blorp.txt"
+    },
+    "y": {
+        "class": "Directory",
+        "location": "keep:99999999999999999999999999999998+99",
+        "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+        "listing": [{
+            "class": "File",
+            "location": "keep:99999999999999999999999999999997+99/file1.txt",
+            "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
+        }]
+    },
+    "z": {
+        "class": "Directory",
+        "basename": "anonymous",
+        "listing": [{
+            "basename": "renamed.txt",
+            "class": "File",
+            "location": "keep:99999999999999999999999999999998+99/file1.txt"
+        }]
+    }
+}
diff --git a/sdk/cwl/tests/submit_test_job_with_mismatched_uuids.json b/sdk/cwl/tests/submit_test_job_with_mismatched_uuids.json
new file mode 100644 (file)
index 0000000..72eb911
--- /dev/null
@@ -0,0 +1,26 @@
+{
+    "x": {
+        "class": "File",
+        "path": "input/blorp.txt"
+    },
+    "y": {
+        "class": "Directory",
+        "location": "keep:99999999999999999999999999999998+99",
+        "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+        "listing": [{
+            "class": "File",
+            "location": "keep:99999999999999999999999999999998+99/file1.txt",
+            "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
+        }]
+    },
+    "z": {
+        "class": "Directory",
+        "basename": "anonymous",
+        "listing": [{
+            "basename": "renamed.txt",
+            "class": "File",
+            "location": "keep:99999999999999999999999999999998+99/file1.txt",
+            "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
+        }]
+    }
+}
diff --git a/sdk/cwl/tests/submit_test_job_with_uuids.json b/sdk/cwl/tests/submit_test_job_with_uuids.json
new file mode 100644 (file)
index 0000000..82d3e2d
--- /dev/null
@@ -0,0 +1,23 @@
+{
+    "x": {
+        "class": "File",
+        "path": "input/blorp.txt"
+    },
+    "y": {
+        "class": "Directory",
+        "location": "keep:zzzzz-4zz18-zzzzzzzzzzzzzzz",
+        "listing": [{
+            "class": "File",
+            "location": "keep:zzzzz-4zz18-zzzzzzzzzzzzzzz/file1.txt"
+        }]
+    },
+    "z": {
+        "class": "Directory",
+        "basename": "anonymous",
+        "listing": [{
+            "basename": "renamed.txt",
+            "class": "File",
+            "location": "keep:zzzzz-4zz18-zzzzzzzzzzzzzzz/file1.txt"
+        }]
+    }
+}
index 1a57da3927a352e614f5a65ebb46887864ece07b..07d962bf9b55e2d33efab5b470b3146cab8c49f9 100644 (file)
@@ -80,7 +80,7 @@ class TestContainer(unittest.TestCase):
 
         return loadingContext, runtimeContext
 
-    # Helper function to set up the ArvCwlExecutor to use the containers api 
+    # Helper function to set up the ArvCwlExecutor to use the containers api
     # and test that the RuntimeStatusLoggingHandler is set up correctly
     def setup_and_test_container_executor_and_logging(self, gcc_mock) :
         api = mock.MagicMock()
@@ -96,7 +96,7 @@ class TestContainer(unittest.TestCase):
         handlerClasses = [h.__class__ for h in root_logger.handlers]
         self.assertTrue(arvados_cwl.RuntimeStatusLoggingHandler in handlerClasses)
         return runner
-        
+
     # The test passes no builder.resources
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@@ -527,9 +527,9 @@ class TestContainer(unittest.TestCase):
         # get_current_container is invoked when we call runtime_status_update
         # so try and log again!
         gcc_mock.side_effect = lambda *args: root_logger.error("Second Error")
-        try: 
+        try:
             root_logger.error("First Error")
-        except RuntimeError: 
+        except RuntimeError:
             self.fail("RuntimeStatusLoggingHandler should not be called recursively")
 
     @mock.patch("arvados_cwl.ArvCwlExecutor.runtime_status_update")
@@ -538,7 +538,7 @@ class TestContainer(unittest.TestCase):
     @mock.patch("arvados.collection.Collection")
     def test_child_failure(self, col, reader, gcc_mock, rts_mock):
         runner = self.setup_and_test_container_executor_and_logging(gcc_mock)
-        
+
         gcc_mock.return_value = {"uuid" : "zzzzz-dz642-zzzzzzzzzzzzzzz"}
         self.assertTrue(gcc_mock.called)
 
@@ -630,6 +630,7 @@ class TestContainer(unittest.TestCase):
             "p1": {
                 "class": "Directory",
                 "location": "keep:99999999999999999999999999999994+44",
+                "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
                 "listing": [
                     {
                         "class": "File",
@@ -660,7 +661,8 @@ class TestContainer(unittest.TestCase):
                     'mounts': {
                         "/keep/99999999999999999999999999999994+44": {
                             "kind": "collection",
-                            "portable_data_hash": "99999999999999999999999999999994+44"
+                            "portable_data_hash": "99999999999999999999999999999994+44",
+                            "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
                         },
                         '/tmp': {'kind': 'tmp',
                                  "capacity": 1073741824 },
index 39117d86e3ca976ffaa19e1e5596e37bf018b842..9535f6ba206bfadf267dda54bbad19fcecf0f961 100644 (file)
@@ -18,7 +18,16 @@ import mock
 import sys
 import unittest
 
-from io import BytesIO, StringIO
+from io import BytesIO
+
+# StringIO.StringIO and io.StringIO have different behavior write() is
+# called with both python2 (byte) strings and unicode strings
+# (specifically there's some logging in cwltool that causes trouble).
+# This isn't a problem on python3 because all string are unicode.
+if sys.version_info[0] < 3:
+    from StringIO import StringIO
+else:
+    from io import StringIO
 
 import arvados
 import arvados.collection
@@ -112,6 +121,11 @@ def stubs(func):
                 "portable_data_hash": "99999999999999999999999999999998+99",
                 "manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
             },
+            "99999999999999999999999999999997+99": {
+                "uuid": "",
+                "portable_data_hash": "99999999999999999999999999999997+99",
+                "manifest_text": ". 99999999999999999999999999999997+99 0:0:file1.txt"
+            },
             "99999999999999999999999999999994+99": {
                 "uuid": "",
                 "portable_data_hash": "99999999999999999999999999999994+99",
@@ -1425,6 +1439,96 @@ class TestSubmit(unittest.TestCase):
             stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
         self.assertEqual(exited, 1)
 
+    @mock.patch("arvados.collection.CollectionReader")
+    @stubs
+    def test_submit_uuid_inputs(self, stubs, collectionReader):
+        collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "file1.txt")
+        def list_side_effect(**kwargs):
+            m = mock.MagicMock()
+            if "count" in kwargs:
+                m.execute.return_value = {"items": [
+                    {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz", "portable_data_hash": "99999999999999999999999999999998+99"}
+                ]}
+            else:
+                m.execute.return_value = {"items": []}
+            return m
+        stubs.api.collections().list.side_effect = list_side_effect
+
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug",
+                "tests/wf/submit_wf.cwl", "tests/submit_test_job_with_uuids.json"],
+            stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['y']['basename'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+        expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['y']['http://arvados.org/cwl#collectionUUID'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+        expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['z']['listing'][0]['http://arvados.org/cwl#collectionUUID'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+
+        stubs.api.collections().list.assert_has_calls([
+            mock.call(count='none',
+                      filters=[['uuid', 'in', ['zzzzz-4zz18-zzzzzzzzzzzzzzz']]],
+                      select=['uuid', 'portable_data_hash'])])
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(stubs.capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+        self.assertEqual(exited, 0)
+
+    @stubs
+    def test_submit_mismatched_uuid_inputs(self, stubs):
+        def list_side_effect(**kwargs):
+            m = mock.MagicMock()
+            if "count" in kwargs:
+                m.execute.return_value = {"items": [
+                    {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz", "portable_data_hash": "99999999999999999999999999999997+99"}
+                ]}
+            else:
+                m.execute.return_value = {"items": []}
+            return m
+        stubs.api.collections().list.side_effect = list_side_effect
+
+        for infile in ("tests/submit_test_job_with_mismatched_uuids.json", "tests/submit_test_job_with_inconsistent_uuids.json"):
+            capture_stderr = StringIO()
+            cwltool_logger = logging.getLogger('cwltool')
+            stderr_logger = logging.StreamHandler(capture_stderr)
+            cwltool_logger.addHandler(stderr_logger)
+
+            try:
+                exited = arvados_cwl.main(
+                    ["--submit", "--no-wait", "--api=containers", "--debug",
+                        "tests/wf/submit_wf.cwl", infile],
+                    stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+                self.assertEqual(exited, 1)
+                self.assertRegexpMatches(
+                    capture_stderr.getvalue(),
+                    r"Expected collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz to be 99999999999999999999999999999998\+99 but API server reported 99999999999999999999999999999997\+99")
+            finally:
+                cwltool_logger.removeHandler(stderr_logger)
+
+    @mock.patch("arvados.collection.CollectionReader")
+    @stubs
+    def test_submit_unknown_uuid_inputs(self, stubs, collectionReader):
+        collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "file1.txt")
+        capture_stderr = StringIO()
+
+        cwltool_logger = logging.getLogger('cwltool')
+        stderr_logger = logging.StreamHandler(capture_stderr)
+        cwltool_logger.addHandler(stderr_logger)
+
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug",
+                "tests/wf/submit_wf.cwl", "tests/submit_test_job_with_uuids.json"],
+            stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+        try:
+            self.assertEqual(exited, 1)
+            self.assertRegexpMatches(
+                capture_stderr.getvalue(),
+                r"Collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz not found")
+        finally:
+            cwltool_logger.removeHandler(stderr_logger)
+
 
 class TestCreateTemplate(unittest.TestCase):
     existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
@@ -1609,22 +1713,24 @@ class TestCreateWorkflow(unittest.TestCase):
 
     @stubs
     def test_incompatible_api(self, stubs):
-        capture_stderr = io.StringIO()
+        capture_stderr = StringIO()
         acr_logger = logging.getLogger('arvados.cwl-runner')
         stderr_logger = logging.StreamHandler(capture_stderr)
         acr_logger.addHandler(stderr_logger)
 
-        exited = arvados_cwl.main(
-            ["--update-workflow", self.existing_workflow_uuid,
-             "--api=jobs",
-             "--debug",
-             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
-            sys.stderr, sys.stderr, api_client=stubs.api)
-        self.assertEqual(exited, 1)
-        self.assertRegexpMatches(
-            capture_stderr.getvalue(),
-            "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
-        acr_logger.removeHandler(stderr_logger)
+        try:
+            exited = arvados_cwl.main(
+                ["--update-workflow", self.existing_workflow_uuid,
+                 "--api=jobs",
+                 "--debug",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                sys.stderr, sys.stderr, api_client=stubs.api)
+            self.assertEqual(exited, 1)
+            self.assertRegexpMatches(
+                capture_stderr.getvalue(),
+                "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
+        finally:
+            acr_logger.removeHandler(stderr_logger)
 
     @stubs
     def test_update(self, stubs):
index f16f98a943cdbe2f0501a35d95cb3e45e9c9d5a9..73addb739cd0e25b47212300d420fa4f2b8c8e7d 100644 (file)
@@ -122,6 +122,12 @@ type Dispatch struct {
 
        // Maximum total worker probes per second
        MaxProbesPerSecond int
+
+       // Time before repeating SIGTERM when killing a container
+       TimeoutSignal Duration
+
+       // Time to give up on SIGTERM and write off the worker
+       TimeoutTERM Duration
 }
 
 type CloudVMs struct {
index da919309f4e829f227f3241eb7d41759087dde08..d4f04eb370659fac2ade716e302a301ea7494577 100644 (file)
@@ -29,7 +29,7 @@ Gem::Specification.new do |s|
   s.add_dependency('andand', '~> 1.3', '>= 1.3.3')
   # Our google-api-client dependency used to be < 0.9, but that could be
   # satisfied by the buggy 0.9.pre*.  https://dev.arvados.org/issues/9213
-  s.add_dependency('cure-google-api-client', '>= 0.7', '< 0.8.9')
+  s.add_dependency('arvados-google-api-client', '>= 0.7', '< 0.8.9')
   # work around undeclared dependency on i18n in some activesupport 3.x.x:
   s.add_dependency('i18n', '~> 0')
   s.add_dependency('json', '>= 1.7.7', '<3')
index 6b1bf795ed7eced1e872809358150c324e234239..cf0261585a9a77cdace3268918f9f4c5614bff5c 100644 (file)
@@ -122,11 +122,28 @@ class GroupsTest < ActionDispatch::IntegrationTest
     assert_includes coll_uuids, collections(:foo_collection_in_aproject).uuid
     assert_not_includes coll_uuids, collections(:expired_collection).uuid
   end
+end
+
+class NonTransactionalGroupsTest < ActionDispatch::IntegrationTest
+  # Transactional tests are disabled to be able to test the concurrent
+  # asynchronous permissions update feature.
+  # This is needed because nested transactions share the connection pool, so
+  # one thread is locked while trying to talk to the database, until the other
+  # one finishes.
+  self.use_transactional_fixtures = false
+
+  teardown do
+    # Explicitly reset the database after each test.
+    post '/database/reset', {}, auth(:admin)
+    assert_response :success
+  end
 
   test "create request with async=true defers permissions update" do
-    Rails.configuration.async_permissions_update_interval = 1 # seconds
+    Rails.configuration.async_permissions_update_interval = 1 # second
     name = "Random group #{rand(1000)}"
     assert_equal nil, Group.find_by_name(name)
+
+    # Trigger the asynchronous permission update by using async=true parameter.
     post "/arvados/v1/groups", {
       group: {
         name: name
@@ -134,8 +151,9 @@ class GroupsTest < ActionDispatch::IntegrationTest
       async: true
     }, auth(:active)
     assert_response 202
-    g = Group.find_by_name(name)
-    assert_not_nil g
+
+    # The group exists on the database, but it's not accessible yet.
+    assert_not_nil Group.find_by_name(name)
     get "/arvados/v1/groups", {
       filters: [["name", "=", name]].to_json,
       limit: 10
@@ -143,10 +161,8 @@ class GroupsTest < ActionDispatch::IntegrationTest
     assert_response 200
     assert_equal 0, json_response['items_available']
 
-    # Unblock the thread doing the permissions update
-    ActiveRecord::Base.clear_active_connections!
-
-    sleep(3)
+    # Wait a bit and try again.
+    sleep(1)
     get "/arvados/v1/groups", {
       filters: [["name", "=", name]].to_json,
       limit: 10
index b3c530e69013e91dfb7599c677347ecef3856d78..933692bdc55b3bbf9c63e78c29c5615418a33d05 100644 (file)
@@ -116,14 +116,21 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
 
        proc, err := os.FindProcess(pi.PID)
        if err != nil {
+               // FindProcess should have succeeded, even if the
+               // process does not exist.
                return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
        }
 
+       // Send the requested signal once, then send signal 0 a few
+       // times.  When proc.Signal() returns an error (process no
+       // longer exists), return success.  If that doesn't happen
+       // within 1 second, return an error.
        err = proc.Signal(signal)
        for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
                err = proc.Signal(syscall.Signal(0))
        }
        if err == nil {
+               // Reached deadline without a proc.Signal() error.
                return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
        }
        fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
index 66aebf80d4236d6b575a178641bd5eae8c14a1bc..8c443fd71afd3ddb2aee089df9bcb745b1c3315d 100755 (executable)
@@ -118,6 +118,23 @@ wait_for_arvbox() {
     fi
 }
 
+docker_run_dev() {
+    docker run \
+          "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
+           "--volume=$SSO_ROOT:/usr/src/sso:rw" \
+           "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
+           "--volume=$WORKBENCH2_ROOT:/usr/src/workbench2:rw" \
+           "--volume=$PG_DATA:/var/lib/postgresql:rw" \
+           "--volume=$VAR_DATA:/var/lib/arvados:rw" \
+           "--volume=$PASSENGER:/var/lib/passenger:rw" \
+           "--volume=$GEMS:/var/lib/gems:rw" \
+           "--volume=$PIPCACHE:/var/lib/pip:rw" \
+           "--volume=$NPMCACHE:/var/lib/npm:rw" \
+           "--volume=$GOSTUFF:/var/lib/gopath:rw" \
+           "--volume=$RLIBS:/var/lib/Rlibs:rw" \
+          "$@"
+}
+
 run() {
     CONFIG=$1
     TAG=$2
@@ -220,22 +237,10 @@ run() {
             mkdir -p $VAR_DATA/test
 
             if test "$need_setup" = 1 ; then
-                docker run \
+                docker_run_dev \
                        --detach \
                        --name=$ARVBOX_CONTAINER \
                        --privileged \
-                       "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
-                       "--volume=$SSO_ROOT:/usr/src/sso:rw" \
-                       "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
-                       "--volume=$WORKBENCH2_ROOT:/usr/src/workbench2:rw" \
-                       "--volume=$PG_DATA:/var/lib/postgresql:rw" \
-                       "--volume=$VAR_DATA:/var/lib/arvados:rw" \
-                       "--volume=$PASSENGER:/var/lib/passenger:rw" \
-                       "--volume=$GEMS:/var/lib/gems:rw" \
-                       "--volume=$PIPCACHE:/var/lib/pip:rw" \
-                       "--volume=$NPMCACHE:/var/lib/npm:rw" \
-                       "--volume=$GOSTUFF:/var/lib/gopath:rw" \
-                       "--volume=$RLIBS:/var/lib/Rlibs:rw" \
                       "--env=SVDIR=/etc/test-service" \
                        arvados/arvbox-dev$TAG
 
@@ -264,22 +269,10 @@ run() {
                    GEM_HOME=/var/lib/gems \
                    "$@"
         elif echo "$CONFIG" | grep 'dev$' ; then
-            docker run \
+            docker_run_dev \
                    --detach \
                    --name=$ARVBOX_CONTAINER \
                    --privileged \
-                   "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
-                   "--volume=$SSO_ROOT:/usr/src/sso:rw" \
-                   "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
-                   "--volume=$WORKBENCH2_ROOT:/usr/src/workbench2:rw" \
-                   "--volume=$PG_DATA:/var/lib/postgresql:rw" \
-                   "--volume=$VAR_DATA:/var/lib/arvados:rw" \
-                   "--volume=$PASSENGER:/var/lib/passenger:rw" \
-                   "--volume=$GEMS:/var/lib/gems:rw" \
-                   "--volume=$PIPCACHE:/var/lib/pip:rw" \
-                   "--volume=$NPMCACHE:/var/lib/npm:rw" \
-                   "--volume=$GOSTUFF:/var/lib/gopath:rw" \
-                   "--volume=$RLIBS:/var/lib/Rlibs:rw" \
                    $PUBLIC \
                    arvados/arvbox-dev$TAG
             updateconf
@@ -494,6 +487,46 @@ case "$subcmd" in
         fi
         ;;
 
+    install-root-cert)
+       set -x
+       sudo cp $VAR_DATA/root-cert.pem /usr/local/share/ca-certificates/${ARVBOX_CONTAINER}-testing-cert.crt
+       sudo update-ca-certificates
+       ;;
+
+    devenv)
+       set -x
+       if docker ps -a --filter "status=exited" | grep -E "${ARVBOX_CONTAINER}-devenv$" -q ; then
+           docker start ${ARVBOX_CONTAINER}-devenv
+       elif ! (docker ps -a --filter "status=running" | grep -E "${ARVBOX_CONTAINER}-devenv$" -q) ; then
+           docker_run_dev \
+                 --detach \
+                --name=${ARVBOX_CONTAINER}-devenv \
+                "--env=SVDIR=/etc/devenv-service" \
+                "--volume=$HOME:$HOME:rw" \
+                --volume=/tmp/.X11-unix:/tmp/.X11-unix:rw \
+                arvados/arvbox-dev$TAG
+       fi
+
+       exec docker exec --interactive --tty \
+            -e LINES=$(tput lines) \
+            -e COLUMNS=$(tput cols) \
+            -e TERM=$TERM \
+            -e "ARVBOX_HOME=$HOME" \
+            -e "DISPLAY=$DISPLAY" \
+            --workdir=$PWD \
+            ${ARVBOX_CONTAINER}-devenv \
+            /usr/local/lib/arvbox/devenv.sh "$@"
+       ;;
+
+    devenv-stop)
+       docker stop ${ARVBOX_CONTAINER}-devenv
+       ;;
+
+    devenv-reset)
+       docker stop ${ARVBOX_CONTAINER}-devenv
+       docker rm ${ARVBOX_CONTAINER}-devenv
+       ;;
+
     *)
         echo "Arvados-in-a-box                      http://arvados.org"
         echo
index 162edc927fe04a566422a53aca7735ee1bd31096..1949af435bd2de82c3c9e2398ce58fa873477035 100644 (file)
@@ -20,7 +20,7 @@ RUN apt-get update && \
     linkchecker python3-virtualenv python-virtualenv xvfb iceweasel \
     libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr \
     libsecret-1-dev r-base r-cran-testthat libxml2-dev pandoc \
-    python3-setuptools python3-pip && \
+    python3-setuptools python3-pip openjdk-8-jdk && \
     apt-get clean
 
 ENV RUBYVERSION_MINOR 2.3
@@ -40,7 +40,7 @@ ENV GEM_HOME /var/lib/gems
 ENV GEM_PATH /var/lib/gems
 ENV PATH $PATH:/var/lib/gems/bin
 
-ENV GOVERSION 1.10.1
+ENV GOVERSION 1.11.5
 
 # Install golang binary
 RUN curl -f http://storage.googleapis.com/golang/go${GOVERSION}.linux-amd64.tar.gz | \
@@ -79,7 +79,7 @@ RUN set -e && curl -L -f ${GDURL} | tar -C /usr/local/bin -xzf - geckodriver
 
 RUN pip install -U setuptools
 
-ENV NODEVERSION v6.11.4
+ENV NODEVERSION v8.15.1
 
 # Install nodejs binary
 RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
@@ -100,7 +100,7 @@ ADD crunch-setup.sh gitolite.rc \
     keep-setup.sh common.sh createusers.sh \
     logger runsu.sh waitforpostgres.sh \
     yml_override.py api-setup.sh \
-    go-setup.sh \
+    go-setup.sh devenv.sh \
     /usr/local/lib/arvbox/
 
 ADD runit /etc/runit
index e6e0397b99a1a3c7095f1b23e4d571edfcb915dd..bb0ff76fe8f065c1be45338f677cf0e7cd99b8ed 100644 (file)
@@ -13,3 +13,4 @@ RUN echo "development" > /var/lib/arvados/sso_rails_env
 RUN echo "development" > /var/lib/arvados/workbench_rails_env
 
 RUN mkdir /etc/test-service && ln -sf /var/lib/arvbox/service/postgres /etc/test-service
+RUN mkdir /etc/devenv-service
\ No newline at end of file
index bbd11f03416a9783904a48cb6823136ceb5c0686..36ff49db51b3b011dc6dea0346c530cb27cb6dd9 100644 (file)
@@ -9,6 +9,7 @@ export GEM_PATH=/var/lib/gems
 export npm_config_cache=/var/lib/npm
 export npm_config_cache_min=Infinity
 export R_LIBS=/var/lib/Rlibs
+export HOME=$(getent passwd arvbox | cut -d: -f6)
 
 if test -s /var/run/localip_override ; then
     localip=$(cat /var/run/localip_override)
index a4689f004aeebab31c71f0dbe35833ec99df7b89..e9721fd55d87c1e5a597f3a56b632310321bb8d1 100755 (executable)
@@ -13,9 +13,13 @@ if ! grep "^arvbox:" /etc/passwd >/dev/null 2>/dev/null ; then
           /var/lib/passenger /var/lib/gopath \
           /var/lib/pip /var/lib/npm
 
+    if test -z "$ARVBOX_HOME" ; then
+       ARVBOX_HOME=/var/lib/arvados
+    fi
+
     groupadd --gid $HOSTGID --non-unique arvbox
     groupadd --gid $HOSTGID --non-unique git
-    useradd --home-dir /var/lib/arvados \
+    useradd --home-dir $ARVBOX_HOME \
             --uid $HOSTUID --gid $HOSTGID \
             --non-unique \
             --groups docker \
@@ -36,6 +40,17 @@ if ! grep "^arvbox:" /etc/passwd >/dev/null 2>/dev/null ; then
     chown crunch:crunch -R /tmp/crunch0 /tmp/crunch1
 
     echo "arvbox    ALL=(crunch) NOPASSWD: ALL" >> /etc/sudoers
+
+    cat <<EOF > /etc/profile.d/paths.sh
+export PATH=/usr/local/bin:/usr/bin:/bin:/usr/local/go/bin:/var/lib/gems/bin:$(ls -d /usr/local/node-*)/bin
+export GEM_HOME=/var/lib/gems
+export GEM_PATH=/var/lib/gems
+export npm_config_cache=/var/lib/npm
+export npm_config_cache_min=Infinity
+export R_LIBS=/var/lib/Rlibs
+export GOPATH=/var/lib/gopath
+EOF
+
 fi
 
 if ! grep "^fuse:" /etc/group >/dev/null 2>/dev/null ; then
diff --git a/tools/arvbox/lib/arvbox/docker/devenv.sh b/tools/arvbox/lib/arvbox/docker/devenv.sh
new file mode 100755 (executable)
index 0000000..9ab3ac4
--- /dev/null
@@ -0,0 +1,12 @@
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+flock /var/lib/arvados/createusers.lock /usr/local/lib/arvbox/createusers.sh
+
+if [[ -n "$*" ]] ; then
+    exec su --preserve-environment arvbox -c "$*"
+else
+    exec su --login arvbox
+fi