Merge branch 'master' into 14669-java-sdk-v2
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 27 Mar 2019 13:21:11 +0000 (09:21 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 27 Mar 2019 13:21:11 +0000 (09:21 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

75 files changed:
apps/workbench/app/controllers/container_requests_controller.rb
apps/workbench/app/helpers/application_helper.rb
build/package-build-dockerfiles/debian8/Dockerfile
build/package-test-dockerfiles/debian8/Dockerfile
build/run-library.sh
build/run-tests.sh
doc/user/cwl/bwa-mem/bwa-mem-input-mixed.yml [new file with mode: 0755]
doc/user/cwl/bwa-mem/bwa-mem-input-uuids.yml [new file with mode: 0755]
doc/user/cwl/cwl-runner.html.textile.liquid
lib/cloud/azure/azure.go
lib/controller/cmd.go
lib/controller/handler_test.go
lib/controller/proxy.go
lib/dispatchcloud/cmd.go
lib/dispatchcloud/container/queue.go
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/driver.go
lib/dispatchcloud/scheduler/fix_stale_locks.go
lib/dispatchcloud/scheduler/interfaces.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/sync.go
lib/dispatchcloud/test/queue.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/runner.go [new file with mode: 0644]
lib/dispatchcloud/worker/worker.go
lib/dispatchcloud/worker/worker_test.go
lib/service/cmd.go
lib/service/cmd_test.go [new file with mode: 0644]
lib/service/error.go [new file with mode: 0644]
sdk/cli/arvados-cli.gemspec
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/arvados_cwl/util.py
sdk/cwl/tests/submit_test_job_with_inconsistent_uuids.json [new file with mode: 0644]
sdk/cwl/tests/submit_test_job_with_mismatched_uuids.json [new file with mode: 0644]
sdk/cwl/tests/submit_test_job_with_uuids.json [new file with mode: 0644]
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_submit.py
sdk/go/arvados/client.go
sdk/go/arvados/config.go
sdk/go/keepclient/discover.go
sdk/ruby/arvados.gemspec
services/api/test/integration/groups_test.rb
services/crunch-run/background.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/crunchstat.py
services/fuse/setup.py
tools/arvbox/bin/arvbox
tools/arvbox/lib/arvbox/docker/Dockerfile.base
tools/arvbox/lib/arvbox/docker/Dockerfile.dev
tools/arvbox/lib/arvbox/docker/common.sh
tools/arvbox/lib/arvbox/docker/createusers.sh
tools/arvbox/lib/arvbox/docker/devenv.sh [new file with mode: 0755]
tools/arvbox/lib/arvbox/docker/service/nginx/run [changed from symlink to file mode: 0755]
tools/arvbox/lib/arvbox/docker/service/nginx/run-service [deleted file]
tools/crunchstat-summary/crunchstat_summary/command.py
tools/crunchstat-summary/crunchstat_summary/dygraphs.py
tools/crunchstat-summary/crunchstat_summary/reader.py
tools/crunchstat-summary/crunchstat_summary/summarizer.py
tools/crunchstat-summary/crunchstat_summary/webchart.py
tools/crunchstat-summary/tests/container_9tee4-dz642-lymtndkpy39eibk-arv-mount.txt.gz.report
tools/crunchstat-summary/tests/container_9tee4-dz642-lymtndkpy39eibk-crunchstat.txt.gz.report
tools/crunchstat-summary/tests/container_9tee4-dz642-lymtndkpy39eibk.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151204190335.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063411.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063439.txt.gz.report
tools/crunchstat-summary/tests/test_examples.py

index 783cafa117d9c23b3641f5f9883b90ea066be384..454be448d9d1e7afad061ac983cd38780abd1365 100644 (file)
@@ -120,7 +120,7 @@ class ContainerRequestsController < ApplicationController
             c = Collection.find(re[1])
             input_obj[param_id] = {"class" => primary_type,
                                    "location" => "keep:#{c.portable_data_hash}#{re[4]}",
-                                   "arv:collection" => input_obj[param_id]}
+                                   "http://arvados.org/cwl#collectionUUID" => re[1]}
           end
         end
       end
index 15bf77fa094f188e5b3f8be980c5c57e3d73bcfe..4c4b5ff34df52c471fa2ceaf566e8f9a5b606d02 100644 (file)
@@ -495,11 +495,12 @@ module ApplicationHelper
       chooser_title = "Choose a #{primary_type == 'Directory' ? 'dataset' : 'file'}:"
       selection_param = object.class.to_s.underscore + dn
       if attrvalue.is_a? Hash
-        display_value = attrvalue[:"arv:collection"] || attrvalue[:location]
+        display_value = attrvalue[:"http://arvados.org/cwl#collectionUUID"] || attrvalue[:"arv:collection"] || attrvalue[:location]
         re = CollectionsHelper.match_uuid_with_optional_filepath(display_value)
+        locationre = CollectionsHelper.match(attrvalue[:location][5..-1])
         if re
-          if re[4]
-            display_value = "#{Collection.find(re[1]).name} / #{re[4][1..-1]}"
+          if locationre and locationre[4]
+            display_value = "#{Collection.find(re[1]).name} / #{locationre[4][1..-1]}"
           else
             display_value = Collection.find(re[1]).name
           end
@@ -677,8 +678,8 @@ module ApplicationHelper
     render_runtime duration, use_words, round_to_min
   end
 
-  # Keep locators are expected to be of the form \"...<pdh/file_path>\"
-  JSON_KEEP_LOCATOR_REGEXP = /([0-9a-f]{32}\+\d+[^'"]*?)(?=['"]|\z|$)/
+  # Keep locators are expected to be of the form \"...<pdh/file_path>\" or \"...<uuid/file_path>\"
+  JSON_KEEP_LOCATOR_REGEXP = /([0-9a-f]{32}\+\d+[^'"]*|[a-z0-9]{5}-4zz18-[a-z0-9]{15}[^'"]*)(?=['"]|\z|$)/
   def keep_locator_in_json str
     # Return a list of all matches
     str.scan(JSON_KEEP_LOCATOR_REGEXP).flatten
index 3f591cdfa14aceab1ff1b4be1c650192d396131f..ec7ae07d826e440cf653dbc0ebcfb8b5bca255ca 100644 (file)
@@ -5,6 +5,8 @@
 FROM debian:jessie
 MAINTAINER Ward Vandewege <ward@curoverse.com>
 
+RUN perl -ni~ -e 'print unless /jessie-updates/' /etc/apt/sources.list
+
 ENV DEBIAN_FRONTEND noninteractive
 
 # Install dependencies.
index 2168f725a1a5a3d9cf7fcbfafb50b28733870f79..82d679abfba0e7321d9229280ad1e1d10995d5c1 100644 (file)
@@ -5,6 +5,8 @@
 FROM debian:8
 MAINTAINER Ward Vandewege <wvandewege@veritasgenetics.com>
 
+RUN perl -ni~ -e 'print unless /jessie-updates/' /etc/apt/sources.list
+
 ENV DEBIAN_FRONTEND noninteractive
 
 # Install dependencies
index de9d67d41cbe3f94a6bb9ca1c9ac1a819051dc35..1daceff2393537485d0dd51f381648076f43d512 100755 (executable)
@@ -826,14 +826,13 @@ install_package() {
   fi
 }
 
-title () {
-    txt="********** $1 **********"
-    printf "\n%*s%s\n\n" $((($COLUMNS-${#txt})/2)) "" "$txt"
+title() {
+    printf '%s %s\n' "=======" "$1"
 }
 
 checkexit() {
     if [[ "$1" != "0" ]]; then
-        title "!!!!!! $2 FAILED !!!!!!"
+        title "$2 -- FAILED"
         failures+=("$2 (`timer`)")
     else
         successes+=("$2 (`timer`)")
@@ -856,7 +855,9 @@ report_outcomes() {
 
     if [[ ${#failures[@]} == 0 ]]
     then
-        echo "All test suites passed."
+        if [[ ${#successes[@]} != 0 ]]; then
+           echo "All test suites passed."
+        fi
     else
         echo "Failures (${#failures[@]}):"
         for x in "${failures[@]}"
index bfa26ec3275a547079d9bd035d65ebd68b709fe9..a4596bd23698565e121ae89bd9c4f193e5f27858 100755 (executable)
@@ -19,6 +19,10 @@ Syntax:
 Options:
 
 --skip FOO     Do not test the FOO component.
+--skip sanity  Skip initial dev environment sanity checks.
+--skip install Do not run any install steps. Just run tests.
+               You should provide GOPATH, GEMHOME, and VENVDIR options
+               from a previous invocation if you use this option.
 --only FOO     Do not test anything except the FOO component.
 --temp DIR     Install components and dependencies under DIR instead of
                making a new temporary directory. Implies --leave-temp.
@@ -27,11 +31,9 @@ Options:
                subsequent invocations.
 --repeat N     Repeat each install/test step until it succeeds N times.
 --retry        Prompt to retry if an install or test suite fails.
---skip-install Do not run any install steps. Just run tests.
-               You should provide GOPATH, GEMHOME, and VENVDIR options
-               from a previous invocation if you use this option.
 --only-install Run specific install step
 --short        Skip (or scale down) some slow tests.
+--interactive  Set up, then prompt for test/install steps to perform.
 WORKSPACE=path Arvados source tree to test.
 CONFIGSRC=path Dir with api server config files to copy into source tree.
                (If none given, leave config files alone in source tree.)
@@ -49,7 +51,7 @@ ARVADOS_DEBUG=1
 envvar=value   Set \$envvar to value. Primarily useful for WORKSPACE,
                *_test, and other examples shown above.
 
-Assuming --skip-install is not given, all components are installed
+Assuming "--skip install" is not given, all components are installed
 into \$GOPATH, \$VENDIR, and \$GEMHOME before running any tests. Many
 test suites depend on other components being installed, and installing
 everything tends to be quicker than debugging dependencies.
@@ -83,6 +85,7 @@ lib/dispatchcloud/container
 lib/dispatchcloud/scheduler
 lib/dispatchcloud/ssh_executor
 lib/dispatchcloud/worker
+lib/service
 services/api
 services/arv-git-httpd
 services/crunchstat
@@ -122,6 +125,7 @@ sdk/R
 sdk/java-v2
 tools/sync-groups
 tools/crunchstat-summary
+tools/crunchstat-summary:py3
 tools/keep-exercise
 tools/keep-rsync
 tools/keep-block-check
@@ -170,7 +174,9 @@ fatal() {
 
 exit_cleanly() {
     trap - INT
-    create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+    if which create-plot-data-from-log.sh >/dev/null; then
+        create-plot-data-from-log.sh $BUILD_NUMBER "$WORKSPACE/apps/workbench/log/test.log" "$WORKSPACE/apps/workbench/log/"
+    fi
     rotate_logfile "$WORKSPACE/apps/workbench/log/" "test.log"
     stop_services
     rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
@@ -180,6 +186,7 @@ exit_cleanly() {
 }
 
 sanity_checks() {
+    [[ -n "${skip[sanity]}" ]] && return 0
     ( [[ -n "$WORKSPACE" ]] && [[ -d "$WORKSPACE/services" ]] ) \
         || fatal "WORKSPACE environment variable not set to a source directory (see: $0 --help)"
     echo Checking dependencies:
@@ -304,8 +311,11 @@ do
         --short)
             short=1
             ;;
+        --interactive)
+            interactive=1
+            ;;
         --skip-install)
-            only_install=nothing
+            skip[install]=1
             ;;
         --only-install)
             only_install="$1"; shift
@@ -356,6 +366,10 @@ if [[ $NEED_SDK_R == false ]]; then
 fi
 
 start_services() {
+    if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
+        return 0
+    fi
+    . "$VENVDIR/bin/activate"
     echo 'Starting API, keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
     if [[ ! -d "$WORKSPACE/services/api/log" ]]; then
        mkdir -p "$WORKSPACE/services/api/log"
@@ -364,8 +378,10 @@ start_services() {
     if [[ -f "$WORKSPACE/tmp/api.pid" && ! -s "$WORKSPACE/tmp/api.pid" ]]; then
        rm -f "$WORKSPACE/tmp/api.pid"
     fi
+    all_services_stopped=
+    fail=0
     cd "$WORKSPACE" \
-        && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo fail=1) \
+        && eval $(python sdk/python/tests/run_test_server.py start --auth admin || echo "fail=1; false") \
         && export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
         && export ARVADOS_TEST_API_INSTALLED="$$" \
         && python sdk/python/tests/run_test_server.py start_controller \
@@ -373,18 +389,22 @@ start_services() {
         && python sdk/python/tests/run_test_server.py start_keep-web \
         && python sdk/python/tests/run_test_server.py start_arv-git-httpd \
         && python sdk/python/tests/run_test_server.py start_ws \
-        && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo fail=1) \
-        && (env | egrep ^ARVADOS)
-    if [[ -n "$fail" ]]; then
-       return 1
+        && eval $(python sdk/python/tests/run_test_server.py start_nginx || echo "fail=1; false") \
+        && (env | egrep ^ARVADOS) \
+        || fail=1
+    deactivate
+    if [[ $fail != 0 ]]; then
+        unset ARVADOS_TEST_API_HOST
     fi
+    return $fail
 }
 
 stop_services() {
-    if [[ -z "$ARVADOS_TEST_API_HOST" ]]; then
+    if [[ -n "$all_services_stopped" ]]; then
         return
     fi
     unset ARVADOS_TEST_API_HOST
+    . "$VENVDIR/bin/activate" || return
     cd "$WORKSPACE" \
         && python sdk/python/tests/run_test_server.py stop_nginx \
         && python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
@@ -392,7 +412,9 @@ stop_services() {
         && python sdk/python/tests/run_test_server.py stop_keep-web \
         && python sdk/python/tests/run_test_server.py stop_keep_proxy \
         && python sdk/python/tests/run_test_server.py stop_controller \
-        && python sdk/python/tests/run_test_server.py stop
+        && python sdk/python/tests/run_test_server.py stop \
+        && all_services_stopped=1
+    deactivate
 }
 
 interrupt() {
@@ -401,36 +423,6 @@ interrupt() {
 }
 trap interrupt INT
 
-sanity_checks
-
-echo "WORKSPACE=$WORKSPACE"
-
-if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
-    # Jenkins expects us to use this by default.
-    CONFIGSRC="$HOME/arvados-api-server"
-fi
-
-# Clean up .pyc files that may exist in the workspace
-cd "$WORKSPACE"
-find -name '*.pyc' -delete
-
-if [[ -z "$temp" ]]; then
-    temp="$(mktemp -d)"
-fi
-
-# Set up temporary install dirs (unless existing dirs were supplied)
-for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
-do
-    if [[ -z "${!tmpdir}" ]]; then
-        eval "$tmpdir"="$temp/$tmpdir"
-    fi
-    if ! [[ -d "${!tmpdir}" ]]; then
-        mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
-    fi
-done
-
-rm -vf "${WORKSPACE}/tmp/*.log"
-
 setup_ruby_environment() {
     if [[ -s "$HOME/.rvm/scripts/rvm" ]] ; then
         source "$HOME/.rvm/scripts/rvm"
@@ -529,114 +521,136 @@ setup_virtualenv() {
     "$venvdest/bin/pip" install --no-cache-dir 'mock>=1.0' 'pbr<1.7.0'
 }
 
-export PERLINSTALLBASE
-export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
+initialize() {
+    sanity_checks
 
-export R_LIBS
+    echo "WORKSPACE=$WORKSPACE"
 
-export GOPATH
-(
-    set -e
-    mkdir -p "$GOPATH/src/git.curoverse.com"
-    if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
-        for d in \
-            "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
-                "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
-                "$GOPATH/src/git.curoverse.com/arvados.git"; do
-            [[ -d "$d" ]] && rmdir "$d"
-        done
+    if [[ -z "$CONFIGSRC" ]] && [[ -d "$HOME/arvados-api-server" ]]; then
+        # Jenkins expects us to use this by default.
+        CONFIGSRC="$HOME/arvados-api-server"
     fi
-    for d in \
-        "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
-        "$GOPATH/src/git.curoverse.com/arvados.git"; do
-        [[ -h "$d" ]] && rm "$d"
-    done
-    ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
-    go get -v github.com/kardianos/govendor
-    cd "$GOPATH/src/git.curoverse.com/arvados.git"
-    if [[ -n "$short" ]]; then
-        go get -v -d ...
-        "$GOPATH/bin/govendor" sync
-    else
-        # Remove cached source dirs in workdir. Otherwise, they will
-        # not qualify as +missing or +external below, and we won't be
-        # able to detect that they're missing from vendor/vendor.json.
-        rm -rf vendor/*/
-        go get -v -d ...
-        "$GOPATH/bin/govendor" sync
-        [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
-            || fatal "vendor/vendor.json has unused or missing dependencies -- try:
 
-(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
+    # Clean up .pyc files that may exist in the workspace
+    cd "$WORKSPACE"
+    find -name '*.pyc' -delete
 
-";
+    if [[ -z "$temp" ]]; then
+        temp="$(mktemp -d)"
     fi
-) || fatal "Go setup failed"
 
-setup_virtualenv "$VENVDIR" --python python2.7
-. "$VENVDIR/bin/activate"
+    # Set up temporary install dirs (unless existing dirs were supplied)
+    for tmpdir in VENVDIR VENV3DIR GOPATH GEMHOME PERLINSTALLBASE R_LIBS
+    do
+        if [[ -z "${!tmpdir}" ]]; then
+            eval "$tmpdir"="$temp/$tmpdir"
+        fi
+        if ! [[ -d "${!tmpdir}" ]]; then
+            mkdir "${!tmpdir}" || fatal "can't create ${!tmpdir} (does $temp exist?)"
+        fi
+    done
 
-# Needed for run_test_server.py which is used by certain (non-Python) tests.
-pip install --no-cache-dir PyYAML \
-    || fatal "pip install PyYAML failed"
+    rm -vf "${WORKSPACE}/tmp/*.log"
 
-# Preinstall libcloud if using a fork; otherwise nodemanager "pip
-# install" won't pick it up by default.
-if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
-    pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
-        || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
-        || fatal "pip install apache-libcloud failed"
-fi
+    export PERLINSTALLBASE
+    export PERL5LIB="$PERLINSTALLBASE/lib/perl5${PERL5LIB:+:$PERL5LIB}"
 
-# Deactivate Python 2 virtualenv
-deactivate
+    export R_LIBS
 
-declare -a pythonstuff
-pythonstuff=(
-    sdk/pam
-    sdk/python
-    sdk/python:py3
-    sdk/cwl
-    sdk/cwl:py3
-    services/dockercleaner:py3
-    services/fuse
-    services/nodemanager
-    tools/crunchstat-summary
-    )
+    export GOPATH
 
-# If Python 3 is available, set up its virtualenv in $VENV3DIR.
-# Otherwise, skip dependent tests.
-PYTHON3=$(which python3)
-if [[ ${?} = 0 ]]; then
-    setup_virtualenv "$VENV3DIR" --python python3
-else
-    PYTHON3=
-    cat >&2 <<EOF
+    # Jenkins config requires that glob tmp/*.log match something. Ensure
+    # that happens even if we don't end up running services that set up
+    # logging.
+    mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
+    touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
 
-Warning: python3 could not be found. Python 3 tests will be skipped.
+    unset http_proxy https_proxy no_proxy
 
-EOF
-fi
 
-# Reactivate Python 2 virtualenv
-. "$VENVDIR/bin/activate"
+    # Note: this must be the last time we change PATH, otherwise rvm will
+    # whine a lot.
+    setup_ruby_environment
+
+    echo "PATH is $PATH"
+}
+
+install_env() {
+    (
+        set -e
+        mkdir -p "$GOPATH/src/git.curoverse.com"
+        if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
+            for d in \
+                "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
+                    "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
+                    "$GOPATH/src/git.curoverse.com/arvados.git"; do
+                [[ -d "$d" ]] && rmdir "$d"
+            done
+        fi
+        for d in \
+            "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
+                "$GOPATH/src/git.curoverse.com/arvados.git"; do
+            [[ -h "$d" ]] && rm "$d"
+        done
+        ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+        go get -v github.com/kardianos/govendor
+        cd "$GOPATH/src/git.curoverse.com/arvados.git"
+        if [[ -n "$short" ]]; then
+            go get -v -d ...
+            "$GOPATH/bin/govendor" sync
+        else
+            # Remove cached source dirs in workdir. Otherwise, they will
+            # not qualify as +missing or +external below, and we won't be
+            # able to detect that they're missing from vendor/vendor.json.
+            rm -rf vendor/*/
+            go get -v -d ...
+            "$GOPATH/bin/govendor" sync
+            [[ -z $("$GOPATH/bin/govendor" list +unused +missing +external | tee /dev/stderr) ]] \
+                || fatal "vendor/vendor.json has unused or missing dependencies -- try:
+
+(export GOPATH=\"${GOPATH}\"; cd \$GOPATH/src/git.curoverse.com/arvados.git && \$GOPATH/bin/govendor add +missing +external && \$GOPATH/bin/govendor remove +unused)
 
-# Note: this must be the last time we change PATH, otherwise rvm will
-# whine a lot.
-setup_ruby_environment
+";
+        fi
+    ) || fatal "Go setup failed"
 
-echo "PATH is $PATH"
+    setup_virtualenv "$VENVDIR" --python python2.7
+    . "$VENVDIR/bin/activate"
 
-if ! which bundler >/dev/null
-then
-    gem install --user-install bundler || fatal 'Could not install bundler'
-fi
+    # Needed for run_test_server.py which is used by certain (non-Python) tests.
+    pip install --no-cache-dir PyYAML \
+        || fatal "pip install PyYAML failed"
+
+    # Preinstall libcloud if using a fork; otherwise nodemanager "pip
+    # install" won't pick it up by default.
+    if [[ -n "$LIBCLOUD_PIN_SRC" ]]; then
+        pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
+            || pip install --pre --ignore-installed --no-cache-dir "$LIBCLOUD_PIN_SRC" >/dev/null \
+            || fatal "pip install apache-libcloud failed"
+    fi
 
-# Jenkins config requires that glob tmp/*.log match something. Ensure
-# that happens even if we don't end up running services that set up
-# logging.
-mkdir -p "${WORKSPACE}/tmp/" || fatal "could not mkdir ${WORKSPACE}/tmp"
-touch "${WORKSPACE}/tmp/controller.log" || fatal "could not touch ${WORKSPACE}/tmp/controller.log"
+    # Deactivate Python 2 virtualenv
+    deactivate
+
+    # If Python 3 is available, set up its virtualenv in $VENV3DIR.
+    # Otherwise, skip dependent tests.
+    PYTHON3=$(which python3)
+    if [[ ${?} = 0 ]]; then
+        setup_virtualenv "$VENV3DIR" --python python3
+    else
+        PYTHON3=
+        cat >&2 <<EOF
+
+Warning: python3 could not be found. Python 3 tests will be skipped.
+
+EOF
+    fi
+
+    if ! which bundler >/dev/null
+    then
+        gem install --user-install bundler || fatal 'Could not install bundler'
+    fi
+}
 
 retry() {
     remain="${repeat}"
@@ -645,7 +659,7 @@ retry() {
         if ${@}; then
             if [[ "$remain" -gt 1 ]]; then
                 remain=$((${remain}-1))
-                title "Repeating ${remain} more times"
+                title "(repeating ${remain} more times)"
             else
                 break
             fi
@@ -673,22 +687,39 @@ do_test() {
             suite="${1}"
             ;;
     esac
-    if [[ -z "${skip[$suite]}" && -z "${skip[$1]}" && \
-              (${#only[@]} -eq 0 || ${only[$suite]} -eq 1 || \
-                   ${only[$1]} -eq 1) ||
-                  ${only[$2]} -eq 1 ]]; then
-        retry do_test_once ${@}
-    else
-        title "Skipping ${1} tests"
+    if [[ -n "${skip[$suite]}" || \
+              -n "${skip[$1]}" || \
+              (${#only[@]} -ne 0 && ${only[$suite]} -eq 0 && ${only[$1]} -eq 0) ]]; then
+        return 0
     fi
+    case "${1}" in
+        services/api)
+            stop_services
+            ;;
+        doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
+            # don't care whether services are running
+            ;;
+        *)
+            if ! start_services; then
+                title "test $1 -- failed to start services"
+                return 1
+            fi
+            ;;
+    esac
+    retry do_test_once ${@}
 }
 
 do_test_once() {
     unset result
 
-    title "Running $1 tests"
+    title "test $1"
     timer_reset
-    if [[ "$2" == "go" ]]
+
+    if which deactivate >/dev/null; then deactivate; fi
+    if ! . "$VENVDIR/bin/activate"
+    then
+        result=1
+    elif [[ "$2" == "go" ]]
     then
         covername="coverage-$(echo "$1" | sed -e 's/\//_/g')"
         coverflags=("-covermode=count" "-coverprofile=$WORKSPACE/tmp/.$covername.tmp")
@@ -741,28 +772,25 @@ do_test_once() {
     fi
     result=${result:-$?}
     checkexit $result "$1 tests"
-    title "End of $1 tests (`timer`)"
+    title "test $1 -- `timer`"
     return $result
 }
 
 do_install() {
-  skipit=false
-
-  if [[ -z "${only_install}" || "${only_install}" == "${1}" || "${only_install}" == "${2}" ]]; then
-      retry do_install_once ${@}
-  else
-      skipit=true
-  fi
-
-  if [[ "$skipit" = true ]]; then
-    title "Skipping $1 install"
-  fi
+    if [[ -n "${skip[install]}" || ( -n "${only_install}" && "${only_install}" != "${1}" && "${only_install}" != "${2}" ) ]]; then
+        return 0
+    fi
+    retry do_install_once ${@}
 }
 
 do_install_once() {
-    title "Running $1 install"
+    title "install $1"
     timer_reset
-    if [[ "$2" == "go" ]]
+
+    if which deactivate >/dev/null; then deactivate; fi
+    if [[ "$1" != "env" ]] && ! . "$VENVDIR/bin/activate"; then
+        result=1
+    elif [[ "$2" == "go" ]]
     then
         go get -ldflags "-X main.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1"
     elif [[ "$2" == "pip" ]]
@@ -790,9 +818,9 @@ do_install_once() {
     else
         "install_$1"
     fi
-    result=$?
+    result=${result:-$?}
     checkexit $result "$1 install"
-    title "End of $1 install (`timer`)"
+    title "install $1 -- `timer`"
     return $result
 }
 
@@ -813,7 +841,6 @@ install_doc() {
         && bundle_install_trylocal \
         && rm -rf .site
 }
-do_install doc
 
 install_gem() {
     gemname=$1
@@ -825,56 +852,32 @@ install_gem() {
         && with_test_gemset gem install --no-ri --no-rdoc $(ls -t "$gemname"-*.gem|head -n1)
 }
 
-install_ruby_sdk() {
+install_sdk/ruby() {
     install_gem arvados sdk/ruby
 }
-do_install sdk/ruby ruby_sdk
 
-install_R_sdk() {
+install_sdk/R() {
   if [[ "$NEED_SDK_R" = true ]]; then
     cd "$WORKSPACE/sdk/R" \
        && Rscript --vanilla install_deps.R
   fi
 }
-do_install sdk/R R_sdk
 
-install_perl_sdk() {
+install_sdk/perl() {
     cd "$WORKSPACE/sdk/perl" \
         && perl Makefile.PL INSTALL_BASE="$PERLINSTALLBASE" \
         && make install INSTALLDIRS=perl
 }
-do_install sdk/perl perl_sdk
 
-install_cli() {
+install_sdk/cli() {
     install_gem arvados-cli sdk/cli
 }
-do_install sdk/cli cli
 
-install_login-sync() {
+install_services/login-sync() {
     install_gem arvados-login-sync services/login-sync
 }
-do_install services/login-sync login-sync
-
-# Install the Python SDK early. Various other test suites (like
-# keepproxy) bring up run_test_server.py, which imports the arvados
-# module. We can't actually *test* the Python SDK yet though, because
-# its own test suite brings up some of those other programs (like
-# keepproxy).
-for p in "${pythonstuff[@]}"
-do
-    dir=${p%:py3}
-    if [[ ${dir} = ${p} ]]; then
-        if [[ -z ${skip[python2]} ]]; then
-            do_install ${dir} pip
-        fi
-    elif [[ -n ${PYTHON3} ]]; then
-        if [[ -z ${skip[python3]} ]]; then
-            do_install ${dir} pip "$VENV3DIR/bin/"
-        fi
-    fi
-done
 
-install_apiserver() {
+install_services/api() {
     cd "$WORKSPACE/services/api" \
         && RAILS_ENV=test bundle_install_trylocal
 
@@ -922,7 +925,19 @@ install_apiserver() {
         && RAILS_ENV=test bundle exec rake db:setup \
         && RAILS_ENV=test bundle exec rake db:fixtures:load
 }
-do_install services/api apiserver
+
+declare -a pythonstuff
+pythonstuff=(
+    sdk/pam
+    sdk/python
+    sdk/python:py3
+    sdk/cwl
+    sdk/cwl:py3
+    services/dockercleaner:py3
+    services/fuse
+    services/nodemanager
+    tools/crunchstat-summary
+)
 
 declare -a gostuff
 gostuff=(
@@ -940,6 +955,7 @@ gostuff=(
     lib/dispatchcloud/scheduler
     lib/dispatchcloud/ssh_executor
     lib/dispatchcloud/worker
+    lib/service
     sdk/go/arvados
     sdk/go/arvadosclient
     sdk/go/auth
@@ -968,22 +984,15 @@ gostuff=(
     tools/keep-rsync
     tools/sync-groups
 )
-for g in "${gostuff[@]}"
-do
-    do_install "$g" go
-done
 
-install_workbench() {
+install_apps/workbench() {
     cd "$WORKSPACE/apps/workbench" \
         && mkdir -p tmp/cache \
         && RAILS_ENV=test bundle_install_trylocal \
         && RAILS_ENV=test RAILS_GROUPS=assets bundle exec rake npm:install
 }
-do_install apps/workbench workbench
 
-unset http_proxy https_proxy no_proxy
-
-test_doclinkchecker() {
+test_doc() {
     (
         set -e
         cd "$WORKSPACE/doc"
@@ -993,113 +1002,244 @@ test_doclinkchecker() {
         PYTHONPATH=$WORKSPACE/sdk/python/ bundle exec rake linkchecker baseurl=file://$WORKSPACE/doc/.site/ arvados_workbench_host=https://workbench.$ARVADOS_API_HOST arvados_api_host=$ARVADOS_API_HOST
     )
 }
-do_test doc doclinkchecker
-
-stop_services
 
-test_apiserver() {
+test_services/api() {
     rm -f "$WORKSPACE/services/api/git-commit.version"
     cd "$WORKSPACE/services/api" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test TESTOPTS=-v ${testargs[services/api]}
 }
-do_test services/api apiserver
-
-# Shortcut for when we're only running apiserver tests. This saves a bit of time,
-# because we don't need to start up the api server for subsequent tests.
-if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
-  rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
-  exit_cleanly
-fi
-
-start_services || { stop_services; fatal "start_services"; }
 
-test_ruby_sdk() {
+test_sdk/ruby() {
     cd "$WORKSPACE/sdk/ruby" \
         && bundle exec rake test TESTOPTS=-v ${testargs[sdk/ruby]}
 }
-do_test sdk/ruby ruby_sdk
 
-test_R_sdk() {
+test_sdk/R() {
   if [[ "$NEED_SDK_R" = true ]]; then
     cd "$WORKSPACE/sdk/R" \
         && Rscript --vanilla run_test.R
   fi
 }
 
-do_test sdk/R R_sdk
-
-test_cli() {
+test_sdk/cli() {
     cd "$WORKSPACE/sdk/cli" \
         && mkdir -p /tmp/keep \
         && KEEP_LOCAL_STORE=/tmp/keep bundle exec rake test TESTOPTS=-v ${testargs[sdk/cli]}
 }
-do_test sdk/cli cli
 
-test_java_v2_sdk() {
+test_sdk/java-v2() {
     cd "$WORKSPACE/sdk/java-v2" && ./gradlew test
 }
-do_test sdk/java-v2 java_v2_sdk
 
-test_login-sync() {
+test_services/login-sync() {
     cd "$WORKSPACE/services/login-sync" \
         && bundle exec rake test TESTOPTS=-v ${testargs[services/login-sync]}
 }
-do_test services/login-sync login-sync
 
-test_nodemanager_integration() {
+test_services/nodemanager_integration() {
     cd "$WORKSPACE/services/nodemanager" \
         && tests/integration_test.py ${testargs[services/nodemanager_integration]}
 }
-do_test services/nodemanager_integration nodemanager_integration
 
-for p in "${pythonstuff[@]}"
-do
-    dir=${p%:py3}
-    if [[ ${dir} = ${p} ]]; then
-        if [[ -z ${skip[python2]} ]]; then
-            do_test ${dir} pip
-        fi
-    elif [[ -n ${PYTHON3} ]]; then
-        if [[ -z ${skip[python3]} ]]; then
-            do_test ${dir} pip "$VENV3DIR/bin/"
-        fi
-    fi
-done
-
-for g in "${gostuff[@]}"
-do
-    do_test "$g" go
-done
-
-test_workbench_units() {
+test_apps/workbench_units() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:units TESTOPTS=-v ${testargs[apps/workbench]}
 }
-do_test apps/workbench_units workbench_units
 
-test_workbench_functionals() {
+test_apps/workbench_functionals() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:functionals TESTOPTS=-v ${testargs[apps/workbench]}
 }
-do_test apps/workbench_functionals workbench_functionals
 
-test_workbench_integration() {
+test_apps/workbench_integration() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:integration TESTOPTS=-v ${testargs[apps/workbench]}
 }
-do_test apps/workbench_integration workbench_integration
-
 
-test_workbench_benchmark() {
+test_apps/workbench_benchmark() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:benchmark ${testargs[apps/workbench_benchmark]}
 }
-do_test apps/workbench_benchmark workbench_benchmark
 
-test_workbench_profile() {
+test_apps/workbench_profile() {
     cd "$WORKSPACE/apps/workbench" \
         && env RAILS_ENV=test ${short:+RAILS_TEST_SHORT=1} bundle exec rake test:profile ${testargs[apps/workbench_profile]}
 }
-do_test apps/workbench_profile workbench_profile
 
+install_deps() {
+    # Install parts needed by test suites
+    do_install env
+    do_install cmd/arvados-server go
+    do_install sdk/cli
+    do_install sdk/perl
+    do_install sdk/python pip
+    do_install sdk/ruby
+    do_install services/api
+    do_install services/arv-git-httpd go
+    do_install services/keepproxy go
+    do_install services/keepstore go
+    do_install services/keep-web go
+    do_install services/ws go
+}
+
+install_all() {
+    do_install env
+    do_install doc
+    do_install sdk/ruby
+    do_install sdk/R
+    do_install sdk/perl
+    do_install sdk/cli
+    do_install services/login-sync
+    for p in "${pythonstuff[@]}"
+    do
+        dir=${p%:py3}
+        if [[ ${dir} = ${p} ]]; then
+            if [[ -z ${skip[python2]} ]]; then
+                do_install ${dir} pip
+            fi
+        elif [[ -n ${PYTHON3} ]]; then
+            if [[ -z ${skip[python3]} ]]; then
+                do_install ${dir} pip "$VENV3DIR/bin/"
+            fi
+        fi
+    done
+    do_install services/api
+    for g in "${gostuff[@]}"
+    do
+        do_install "$g" go
+    done
+    do_install apps/workbench
+}
+
+test_all() {
+    stop_services
+    do_test services/api
+
+    # Shortcut for when we're only running apiserver tests. This saves a bit of time,
+    # because we don't need to start up the api server for subsequent tests.
+    if [ ! -z "$only" ] && [ "$only" == "services/api" ]; then
+        rotate_logfile "$WORKSPACE/services/api/log/" "test.log"
+        exit_cleanly
+    fi
+
+    do_test doc
+    do_test sdk/ruby
+    do_test sdk/R
+    do_test sdk/cli
+    do_test services/login-sync
+    do_test sdk/java-v2
+    do_test services/nodemanager_integration
+    for p in "${pythonstuff[@]}"
+    do
+        dir=${p%:py3}
+        if [[ ${dir} = ${p} ]]; then
+            if [[ -z ${skip[python2]} ]]; then
+                do_test ${dir} pip
+            fi
+        elif [[ -n ${PYTHON3} ]]; then
+            if [[ -z ${skip[python3]} ]]; then
+                do_test ${dir} pip "$VENV3DIR/bin/"
+            fi
+        fi
+    done
+
+    for g in "${gostuff[@]}"
+    do
+        do_test "$g" go
+    done
+    do_test apps/workbench_units
+    do_test apps/workbench_functionals
+    do_test apps/workbench_integration
+    do_test apps/workbench_benchmark
+    do_test apps/workbench_profile
+}
+
+help_interactive() {
+    echo "== Interactive commands:"
+    echo "TARGET                 (short for 'test DIR')"
+    echo "test TARGET"
+    echo "test TARGET:py3        (test with python3)"
+    echo "test TARGET -check.vv  (pass arguments to test)"
+    echo "install TARGET"
+    echo "install env            (go/python libs)"
+    echo "install deps           (go/python libs + arvados components needed for integration tests)"
+    echo "reset                  (...services used by integration tests)"
+    echo "exit"
+    echo "== Test targets:"
+    echo "${!testfuncargs[@]}" | tr ' ' '\n' | sort | column
+}
+
+initialize
+
+declare -A testfuncargs=()
+for g in "${gostuff[@]}"; do
+    testfuncargs[$g]="$g go"
+done
+for p in "${pythonstuff[@]}"; do
+    dir=${p%:py3}
+    if [[ ${dir} = ${p} ]]; then
+        testfuncargs[$p]="$dir pip $VENVDIR/bin/"
+    else
+        testfuncargs[$p]="$dir pip $VENV3DIR/bin/"
+    fi
+done
+
+if [[ -z ${interactive} ]]; then
+    install_all
+    test_all
+else
+    skip=()
+    only=()
+    only_install=()
+    if [[ -e "$VENVDIR/bin/activate" ]]; then stop_services; fi
+    setnextcmd() {
+        if [[ "$nextcmd" != "install deps" ]]; then
+            :
+        elif [[ -e "$VENVDIR/bin/activate" ]]; then
+            nextcmd="test lib/cmd"
+        else
+            nextcmd="install deps"
+        fi
+    }
+    echo
+    help_interactive
+    nextcmd="install deps"
+    setnextcmd
+    while read -p 'What next? ' -e -i "${nextcmd}" nextcmd; do
+        read verb target opts <<<"${nextcmd}"
+        case "${verb}" in
+            "" | "help")
+                help_interactive
+                ;;
+            "exit" | "quit")
+                exit_cleanly
+                ;;
+            "reset")
+                stop_services
+                ;;
+            *)
+                target="${target%/}"
+                testargs["$target"]="${opts}"
+                case "$target" in
+                    all | deps)
+                        ${verb}_${target}
+                        ;;
+                    *)
+                        tt="${testfuncargs[${target}]}"
+                        tt="${tt:-$target}"
+                        do_$verb $tt
+                        ;;
+                esac
+                ;;
+        esac
+        if [[ ${#successes[@]} -gt 0 || ${#failures[@]} -gt 0 ]]; then
+            report_outcomes
+            successes=()
+            failures=()
+        fi
+        cd "$WORKSPACE"
+        setnextcmd
+    done
+    echo
+fi
 exit_cleanly
diff --git a/doc/user/cwl/bwa-mem/bwa-mem-input-mixed.yml b/doc/user/cwl/bwa-mem/bwa-mem-input-mixed.yml
new file mode 100755 (executable)
index 0000000..73bd9f5
--- /dev/null
@@ -0,0 +1,29 @@
+#!/usr/bin/env cwl-runner
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: CC-BY-SA-3.0
+
+# Example input file providing both content addresses and UUIDs.  The
+# collections identified by 'collectionUUID' will be checked that the
+# current content of the collection record matches the content address
+# in the 'location' field.
+
+$namespaces:
+  arv: 'http://arvados.org/cwl#'
+
+cwl:tool: bwa-mem.cwl
+reference:
+  class: File
+  location: keep:2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt
+  arv:collectionUUID: qr1hi-4zz18-pwid4w22a40jp8l
+read_p1:
+  class: File
+  location: keep:ae480c5099b81e17267b7445e35b4bc7+180/HWI-ST1027_129_D0THKACXX.1_1.fastq
+  arv:collectionUUID: qr1hi-4zz18-h615rgfmqt3wje0
+read_p2:
+  class: File
+  location: keep:ae480c5099b81e17267b7445e35b4bc7+180/HWI-ST1027_129_D0THKACXX.1_2.fastq
+  arv:collectionUUID: qr1hi-4zz18-h615rgfmqt3wje0
+group_id: arvados_tutorial
+sample_id: HWI-ST1027_129
+PL: illumina
diff --git a/doc/user/cwl/bwa-mem/bwa-mem-input-uuids.yml b/doc/user/cwl/bwa-mem/bwa-mem-input-uuids.yml
new file mode 100755 (executable)
index 0000000..7e71e95
--- /dev/null
@@ -0,0 +1,21 @@
+#!/usr/bin/env cwl-runner
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: CC-BY-SA-3.0
+
+# Example input file using UUIDs to reference input collections. These
+# will be resolved to content addresses before running the workflow.
+
+cwl:tool: bwa-mem.cwl
+reference:
+  class: File
+  location: keep:qr1hi-4zz18-pwid4w22a40jp8l/19.fasta.bwt
+read_p1:
+  class: File
+  location: keep:qr1hi-4zz18-h615rgfmqt3wje0/HWI-ST1027_129_D0THKACXX.1_1.fastq
+read_p2:
+  class: File
+  location: keep:qr1hi-4zz18-h615rgfmqt3wje0/HWI-ST1027_129_D0THKACXX.1_2.fastq
+group_id: arvados_tutorial
+sample_id: HWI-ST1027_129
+PL: illumina
index ad5d3bd83643e6d9134dbfddddfdf2209be66140..fbce8e17beb7a874689f3de265394a73dcf8c2cb 100644 (file)
@@ -69,7 +69,7 @@ arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107,
 2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
 {
     "aligned_sam": {
-        "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
+        "location": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
         "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
         "class": "File",
         "size": 30738986
@@ -82,7 +82,7 @@ h3. Referencing files
 
 When running a workflow on an Arvados cluster, the input files must be stored in Keep.  There are several ways this can happen.
 
-A URI reference to Keep uses the @keep:@ scheme followed by the portable data hash, collection size, and path to the file inside the collection.  For example, @keep:2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt@.
+A URI reference to Keep uses the @keep:@ scheme followed by either the portable data hash or UUID of the collection and then the location of the file inside the collection.  For example, @keep:2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt@ or @keep:zzzzz-4zz18-zzzzzzzzzzzzzzz/19.fasta.bwt@.
 
 If you reference a file in "arv-mount":{{site.baseurl}}/user/tutorials/tutorial-keep-mount.html, such as @/home/example/keep/by_id/2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt@, then @arvados-cwl-runner@ will automatically determine the appropriate Keep URI reference.
 
@@ -100,7 +100,7 @@ arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107,
 2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
 {
     "aligned_sam": {
-        "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
+        "location": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
         "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
         "class": "File",
         "size": 30738986
index d19e4bef2372ff87dfb13e26db1ffddb9726bdca..d37183fbdf5e5f4b7ec639a61be1abf960a4f200 100644 (file)
@@ -341,6 +341,10 @@ func (az *azureInstanceSet) Create(
        az.stopWg.Add(1)
        defer az.stopWg.Done()
 
+       if instanceType.AddedScratch > 0 {
+               return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
+       }
+
        name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
        if err != nil {
                return nil, err
@@ -647,7 +651,8 @@ func (ai *azureInstance) Destroy() error {
 func (ai *azureInstance) Address() string {
        if ai.nic.IPConfigurations != nil &&
                len(*ai.nic.IPConfigurations) > 0 &&
-               (*ai.nic.IPConfigurations)[0].PrivateIPAddress != nil {
+               (*ai.nic.IPConfigurations)[0].InterfaceIPConfigurationPropertiesFormat != nil &&
+               (*ai.nic.IPConfigurations)[0].InterfaceIPConfigurationPropertiesFormat.PrivateIPAddress != nil {
 
                return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
        }
index c1d4657ba47b7801b63bad0222e4a2df71f7881d..f0268091bedb58f412d4e93ba675481d99f5e3ef 100644 (file)
@@ -14,6 +14,6 @@ import (
 
 var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
 
-func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile, _ string) service.Handler {
        return &Handler{Cluster: cluster, NodeProfile: np}
 }
index dfe60d90a5f3119909658149b1017f3b782515f3..96110ea85859b05b362f849475a9d77c91919752 100644 (file)
@@ -50,7 +50,7 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
                },
        }
        node := s.cluster.NodeProfiles["*"]
-       s.handler = newHandler(s.ctx, s.cluster, &node)
+       s.handler = newHandler(s.ctx, s.cluster, &node, "")
 }
 
 func (s *HandlerSuite) TearDownTest(c *check.C) {
index c01c152352e6b8f101179bf38add3b0574a00c5d..c0b94c2b5f76d604e738c2d9bc43d3a01f8bf5dc 100644 (file)
@@ -32,6 +32,7 @@ var dropHeaders = map[string]bool{
        "Keep-Alive":          true,
        "Proxy-Authenticate":  true,
        "Proxy-Authorization": true,
+       // this line makes gofmt 1.10 and 1.11 agree
        "TE":                true,
        "Trailer":           true,
        "Transfer-Encoding": true, // *-Encoding headers interfer with Go's automatic compression/decompression
index 7231e839475639c2aa5e6c720091c15b4d4b5ed7..22ceb8aebe787ae79c1274cc0c714bc39df04640 100644 (file)
@@ -6,6 +6,7 @@ package dispatchcloud
 
 import (
        "context"
+       "fmt"
 
        "git.curoverse.com/arvados.git/lib/cmd"
        "git.curoverse.com/arvados.git/lib/service"
@@ -14,8 +15,17 @@ import (
 
 var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
 
-func newHandler(ctx context.Context, cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
-       d := &dispatcher{Cluster: cluster, Context: ctx}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile, token string) service.Handler {
+       ac, err := arvados.NewClientFromConfig(cluster)
+       if err != nil {
+               return service.ErrorHandler(ctx, cluster, np, fmt.Errorf("error initializing client from cluster config: %s", err))
+       }
+       d := &dispatcher{
+               Cluster:   cluster,
+               Context:   ctx,
+               ArvClient: ac,
+               AuthToken: token,
+       }
        go d.Start()
        return d
 }
index bbe47625a893d6874d2c3c415952948f290de74f..af17aaf3927ce9f3b8b94a03ca289201c11640d2 100644 (file)
@@ -5,6 +5,7 @@
 package container
 
 import (
+       "errors"
        "io"
        "sync"
        "time"
@@ -52,7 +53,6 @@ func (c *QueueEnt) String() string {
 // cache up to date.
 type Queue struct {
        logger     logrus.FieldLogger
-       reg        *prometheus.Registry
        chooseType typeChooser
        client     APIClient
 
@@ -78,14 +78,17 @@ type Queue struct {
 // Arvados cluster's queue during Update, chooseType will be called to
 // assign an appropriate arvados.InstanceType for the queue entry.
 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
-       return &Queue{
+       cq := &Queue{
                logger:      logger,
-               reg:         reg,
                chooseType:  chooseType,
                client:      client,
                current:     map[string]QueueEnt{},
                subscribers: map[<-chan struct{}]chan struct{}{},
        }
+       if reg != nil {
+               go cq.runMetrics(reg)
+       }
+       return cq
 }
 
 // Subscribe returns a channel that becomes ready to receive when an
@@ -398,32 +401,62 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
        }
        apply(avail)
 
-       var missing []string
+       missing := map[string]bool{}
        cq.mtx.Lock()
        for uuid, ent := range cq.current {
                if next[uuid] == nil &&
                        ent.Container.State != arvados.ContainerStateCancelled &&
                        ent.Container.State != arvados.ContainerStateComplete {
-                       missing = append(missing, uuid)
+                       missing[uuid] = true
                }
        }
        cq.mtx.Unlock()
 
-       for i, page := 0, 20; i < len(missing); i += page {
-               batch := missing[i:]
-               if len(batch) > page {
-                       batch = batch[:page]
+       for len(missing) > 0 {
+               var batch []string
+               for uuid := range missing {
+                       batch = append(batch, uuid)
+                       if len(batch) == 20 {
+                               break
+                       }
                }
+               filters := []arvados.Filter{{"uuid", "in", batch}}
                ended, err := cq.fetchAll(arvados.ResourceListParams{
                        Select:  selectParam,
                        Order:   "uuid",
                        Count:   "none",
-                       Filters: []arvados.Filter{{"uuid", "in", batch}},
+                       Filters: filters,
                })
                if err != nil {
                        return nil, err
                }
                apply(ended)
+               if len(ended) == 0 {
+                       // This is the only case where we can conclude
+                       // a container has been deleted from the
+                       // database. A short (but non-zero) page, on
+                       // the other hand, can be caused by a response
+                       // size limit.
+                       for _, uuid := range batch {
+                               cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
+                               delete(missing, uuid)
+                               cq.mtx.Lock()
+                               cq.delEnt(uuid, cq.current[uuid].Container.State)
+                               cq.mtx.Unlock()
+                       }
+                       continue
+               }
+               for _, ctr := range ended {
+                       if _, ok := missing[ctr.UUID]; !ok {
+                               msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
+                               cq.logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": ctr.UUID,
+                                       "Filters":       filters,
+                               }).Error(msg)
+                               return nil, errors.New(msg)
+                       }
+                       delete(missing, ctr.UUID)
+               }
        }
        return next, nil
 }
@@ -456,3 +489,34 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C
        }
        return results, nil
 }
+
+func (cq *Queue) runMetrics(reg *prometheus.Registry) {
+       mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "queue_entries",
+               Help:      "Number of active container entries in the controller database.",
+       }, []string{"state", "instance_type"})
+       reg.MustRegister(mEntries)
+
+       type entKey struct {
+               state arvados.ContainerState
+               inst  string
+       }
+       count := map[entKey]int{}
+
+       ch := cq.Subscribe()
+       defer cq.Unsubscribe(ch)
+       for range ch {
+               for k := range count {
+                       count[k] = 0
+               }
+               ents, _ := cq.Entries()
+               for _, ent := range ents {
+                       count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+               }
+               for k, v := range count {
+                       mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
+               }
+       }
+}
index adf1028b35fe16ab13afbfcb4f0c91672ec17849..71ff9c784e958fa7927cb3ca57214593d74eecd7 100644 (file)
@@ -39,12 +39,15 @@ type pool interface {
        scheduler.WorkerPool
        Instances() []worker.InstanceView
        SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
+       KillInstance(id cloud.InstanceID, reason string) error
        Stop()
 }
 
 type dispatcher struct {
        Cluster       *arvados.Cluster
        Context       context.Context
+       ArvClient     *arvados.Client
+       AuthToken     string
        InstanceSetID cloud.InstanceSetID
 
        logger      logrus.FieldLogger
@@ -107,19 +110,21 @@ func (disp *dispatcher) setup() {
 }
 
 func (disp *dispatcher) initialize() {
-       arvClient := arvados.NewClientFromEnv()
+       disp.logger = ctxlog.FromContext(disp.Context)
+
+       disp.ArvClient.AuthToken = disp.AuthToken
+
        if disp.InstanceSetID == "" {
-               if strings.HasPrefix(arvClient.AuthToken, "v2/") {
-                       disp.InstanceSetID = cloud.InstanceSetID(strings.Split(arvClient.AuthToken, "/")[1])
+               if strings.HasPrefix(disp.AuthToken, "v2/") {
+                       disp.InstanceSetID = cloud.InstanceSetID(strings.Split(disp.AuthToken, "/")[1])
                } else {
                        // Use some other string unique to this token
                        // that doesn't reveal the token itself.
-                       disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(arvClient.AuthToken))))
+                       disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(disp.AuthToken))))
                }
        }
        disp.stop = make(chan struct{}, 1)
        disp.stopped = make(chan struct{})
-       disp.logger = ctxlog.FromContext(disp.Context)
 
        if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
                disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
@@ -133,8 +138,8 @@ func (disp *dispatcher) initialize() {
        }
        disp.instanceSet = instanceSet
        disp.reg = prometheus.NewRegistry()
-       disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
-       disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
+       disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
+       disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, disp.ArvClient)
 
        if disp.Cluster.ManagementToken == "" {
                disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -147,6 +152,7 @@ func (disp *dispatcher) initialize() {
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
                metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
                        ErrorLog: disp.logger,
                })
@@ -212,6 +218,20 @@ func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
        disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
 }
 
+// Management API: shutdown/destroy specified instance now.
+func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
+       id := cloud.InstanceID(r.FormValue("instance_id"))
+       if id == "" {
+               httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+               return
+       }
+       err := disp.pool.KillInstance(id, "via management API: "+r.FormValue("reason"))
+       if err != nil {
+               httpserver.Error(w, err.Error(), http.StatusNotFound)
+               return
+       }
+}
+
 func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
        id := cloud.InstanceID(r.FormValue("instance_id"))
        if id == "" {
index 36b06020748f43f5f4c7bbdefb5302935dedb861..00157b75c649226880898c802973e9cd03a82173 100644 (file)
@@ -17,6 +17,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "golang.org/x/crypto/ssh"
        check "gopkg.in/check.v1"
@@ -49,12 +50,13 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 
        s.cluster = &arvados.Cluster{
                CloudVMs: arvados.CloudVMs{
-                       Driver:          "test",
-                       SyncInterval:    arvados.Duration(10 * time.Millisecond),
-                       TimeoutIdle:     arvados.Duration(150 * time.Millisecond),
-                       TimeoutBooting:  arvados.Duration(150 * time.Millisecond),
-                       TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
-                       TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+                       Driver:               "test",
+                       SyncInterval:         arvados.Duration(10 * time.Millisecond),
+                       TimeoutIdle:          arvados.Duration(150 * time.Millisecond),
+                       TimeoutBooting:       arvados.Duration(150 * time.Millisecond),
+                       TimeoutProbe:         arvados.Duration(15 * time.Millisecond),
+                       TimeoutShutdown:      arvados.Duration(5 * time.Millisecond),
+                       MaxCloudOpsPerSecond: 500,
                },
                Dispatch: arvados.Dispatch{
                        PrivateKey:         string(dispatchprivraw),
@@ -62,6 +64,8 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        ProbeInterval:      arvados.Duration(5 * time.Millisecond),
                        StaleLockTimeout:   arvados.Duration(5 * time.Millisecond),
                        MaxProbesPerSecond: 1000,
+                       TimeoutSignal:      arvados.Duration(3 * time.Millisecond),
+                       TimeoutTERM:        arvados.Duration(20 * time.Millisecond),
                },
                InstanceTypes: arvados.InstanceTypeMap{
                        test.InstanceType(1).Name:  test.InstanceType(1),
@@ -78,10 +82,19 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                                DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
                        },
                },
+               Services: arvados.Services{
+                       Controller: arvados.Service{ExternalURL: arvados.URL{Scheme: "https", Host: os.Getenv("ARVADOS_API_HOST")}},
+               },
        }
+
+       arvClient, err := arvados.NewClientFromConfig(s.cluster)
+       c.Check(err, check.IsNil)
+
        s.disp = &dispatcher{
-               Cluster: s.cluster,
-               Context: s.ctx,
+               Cluster:   s.cluster,
+               Context:   s.ctx,
+               ArvClient: arvClient,
+               AuthToken: arvadostest.AdminToken,
        }
        // Test cases can modify s.cluster before calling
        // initialize(), and then modify private state before calling
@@ -124,17 +137,20 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        for _, ctr := range queue.Containers {
                waiting[ctr.UUID] = struct{}{}
        }
-       executeContainer := func(ctr arvados.Container) int {
+       finishContainer := func(ctr arvados.Container) {
                mtx.Lock()
                defer mtx.Unlock()
                if _, ok := waiting[ctr.UUID]; !ok {
-                       c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
-                       return 1
+                       c.Errorf("container completed twice: %s", ctr.UUID)
+                       return
                }
                delete(waiting, ctr.UUID)
                if len(waiting) == 0 {
                        close(done)
                }
+       }
+       executeContainer := func(ctr arvados.Container) int {
+               finishContainer(ctr)
                return int(rand.Uint32() & 0x3)
        }
        n := 0
@@ -144,11 +160,14 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
                stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
                stubvm.ExecuteContainer = executeContainer
+               stubvm.CrashRunningContainer = finishContainer
                switch n % 7 {
                case 0:
                        stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
                case 1:
                        stubvm.CrunchRunMissing = true
+               case 2:
+                       stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
                default:
                        stubvm.CrunchRunCrashRate = 0.1
                }
@@ -261,7 +280,13 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
        c.Check(ok, check.Equals, true)
        <-ch
 
-       sr = getInstances()
+       for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); {
+               sr = getInstances()
+               if len(sr.Items) > 0 {
+                       break
+               }
+               time.Sleep(time.Millisecond)
+       }
        c.Assert(len(sr.Items), check.Equals, 1)
        c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
        c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
index 0343f85b91a7bc63d20034e111e04487608bef9c..eb1e48737c8b131cbb919ca71e8f6bbc377c553a 100644 (file)
@@ -6,12 +6,14 @@ package dispatchcloud
 
 import (
        "fmt"
+       "time"
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/cloud/azure"
        "git.curoverse.com/arvados.git/lib/cloud/ec2"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
 )
 
 var drivers = map[string]cloud.Driver{
@@ -24,5 +26,33 @@ func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger
        if !ok {
                return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
        }
-       return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+       is, err := driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+       if maxops := cluster.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
+               is = &rateLimitedInstanceSet{
+                       InstanceSet: is,
+                       ticker:      time.NewTicker(time.Second / time.Duration(maxops)),
+               }
+       }
+       return is, err
+}
+
+type rateLimitedInstanceSet struct {
+       cloud.InstanceSet
+       ticker *time.Ticker
+}
+
+func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+       <-is.ticker.C
+       inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
+       return &rateLimitedInstance{inst, is.ticker}, err
+}
+
+type rateLimitedInstance struct {
+       cloud.Instance
+       ticker *time.Ticker
+}
+
+func (inst *rateLimitedInstance) Destroy() error {
+       <-inst.ticker.C
+       return inst.Instance.Destroy()
 }
index 148b653c2e52305b2ece2255c49d98bf6cb72f50..1f9338f7b8d02e66d73239cc0f0c76ebc53ab423 100644 (file)
@@ -47,7 +47,6 @@ waiting:
                        // Give up.
                        break waiting
                }
-
        }
 
        for _, uuid := range stale {
index 18cdc94fa52156ceab01d7dbe135d8db20029176..307807e32337257f14d02178cbd6e98de61f7be8 100644 (file)
@@ -38,7 +38,7 @@ type WorkerPool interface {
        Create(arvados.InstanceType) bool
        Shutdown(arvados.InstanceType) bool
        StartContainer(arvados.InstanceType, arvados.Container) bool
-       KillContainer(uuid string)
+       KillContainer(uuid, reason string)
        Subscribe() <-chan struct{}
        Unsubscribe(<-chan struct{})
 }
index 4296a1364c911fc94d44af28512ecac195b4e5f5..dab324579dc84e9c8c93086fe693a638f029191f 100644 (file)
@@ -77,7 +77,7 @@ func (p *stubPool) Create(it arvados.InstanceType) bool {
        p.unalloc[it]++
        return true
 }
-func (p *stubPool) KillContainer(uuid string) {
+func (p *stubPool) KillContainer(uuid, reason string) {
        p.Lock()
        defer p.Unlock()
        delete(p.running, uuid)
@@ -335,3 +335,40 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
        }
        c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
 }
+
+func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+       pool := stubPool{
+               unalloc: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 0,
+               },
+               idle: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 0,
+               },
+               running: map[string]time.Time{
+                       test.ContainerUUID(2): time.Time{},
+               },
+       }
+       queue := test.Queue{
+               ChooseType: chooseType,
+               Containers: []arvados.Container{
+                       {
+                               // create a new worker
+                               UUID:     test.ContainerUUID(1),
+                               Priority: 1,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+               },
+       }
+       queue.Update()
+       sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+       c.Check(pool.running, check.HasLen, 1)
+       sch.sync()
+       for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
+       }
+       c.Check(pool.Running(), check.HasLen, 0)
+}
index 23fc621dea26c76be659ddc4f88bea7565f4bd4c..99bee484c6f7162a3e875b7627738a555fb46e13 100644 (file)
@@ -30,11 +30,11 @@ func (sch *Scheduler) sync() {
                switch ent.Container.State {
                case arvados.ContainerStateRunning:
                        if !running {
-                               go sch.cancel(ent, "not running on any worker")
+                               go sch.cancel(uuid, "not running on any worker")
                        } else if !exited.IsZero() && qUpdated.After(exited) {
-                               go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
+                               go sch.cancel(uuid, "state=Running after crunch-run exited")
                        } else if ent.Container.Priority == 0 {
-                               go sch.kill(ent, "priority=0")
+                               go sch.kill(uuid, "priority=0")
                        }
                case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
                        if running {
@@ -46,7 +46,7 @@ func (sch *Scheduler) sync() {
                                // of kill() will be to make the
                                // worker available for the next
                                // container.
-                               go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
                        } else {
                                sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
@@ -60,13 +60,13 @@ func (sch *Scheduler) sync() {
                                // a network outage and is still
                                // preparing to run a container that
                                // has already been unlocked/requeued.
-                               go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
                        }
                case arvados.ContainerStateLocked:
                        if running && !exited.IsZero() && qUpdated.After(exited) {
                                go sch.requeue(ent, "crunch-run exited")
                        } else if running && exited.IsZero() && ent.Container.Priority == 0 {
-                               go sch.kill(ent, "priority=0")
+                               go sch.kill(uuid, "priority=0")
                        } else if !running && ent.Container.Priority == 0 {
                                go sch.requeue(ent, "priority=0")
                        }
@@ -77,10 +77,14 @@ func (sch *Scheduler) sync() {
                        }).Error("BUG: unexpected state")
                }
        }
+       for uuid := range running {
+               if _, known := qEntries[uuid]; !known {
+                       go sch.kill(uuid, "not in queue")
+               }
+       }
 }
 
-func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
-       uuid := ent.Container.UUID
+func (sch *Scheduler) cancel(uuid string, reason string) {
        if !sch.uuidLock(uuid, "cancel") {
                return
        }
@@ -93,11 +97,8 @@ func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
        }
 }
 
-func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
-       uuid := ent.Container.UUID
-       logger := sch.logger.WithField("ContainerUUID", uuid)
-       logger.Debugf("killing crunch-run process because %s", reason)
-       sch.pool.KillContainer(uuid)
+func (sch *Scheduler) kill(uuid string, reason string) {
+       sch.pool.KillContainer(uuid, reason)
 }
 
 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
index e18a2b5362b413a16eeb28984bfabbe34796f6db..6a6c952358bcadff447d08869e641f32d24618ad 100644 (file)
@@ -158,14 +158,21 @@ func (q *Queue) Update() error {
 //
 // The resulting changes are not exposed through Get() or Entries()
 // until the next call to Update().
-func (q *Queue) Notify(upd arvados.Container) {
+//
+// Return value is true unless the update is rejected (invalid state
+// transition).
+func (q *Queue) Notify(upd arvados.Container) bool {
        q.mtx.Lock()
        defer q.mtx.Unlock()
        for i, ctr := range q.Containers {
                if ctr.UUID == upd.UUID {
-                       q.Containers[i] = upd
-                       return
+                       if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+                               q.Containers[i] = upd
+                               return true
+                       }
+                       return false
                }
        }
        q.Containers = append(q.Containers, upd)
+       return true
 }
index 4df39d0c46ac7dc538e5c7d9949cd884df8768b4..873d987327eafed2a53f6d63f0dcc17230dbeb0d 100644 (file)
@@ -126,6 +126,8 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
                tags:         copyTags(tags),
                providerType: it.ProviderType,
                initCommand:  cmd,
+               running:      map[string]int64{},
+               killing:      map[string]bool{},
        }
        svm.SSHService = SSHService{
                HostKey:        sis.driver.HostKey,
@@ -177,12 +179,14 @@ func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
 // running (and might change IP addresses, shut down, etc.)  without
 // updating any stubInstances that have been returned to callers.
 type StubVM struct {
-       Boot                 time.Time
-       Broken               time.Time
-       CrunchRunMissing     bool
-       CrunchRunCrashRate   float64
-       CrunchRunDetachDelay time.Duration
-       ExecuteContainer     func(arvados.Container) int
+       Boot                  time.Time
+       Broken                time.Time
+       ReportBroken          time.Time
+       CrunchRunMissing      bool
+       CrunchRunCrashRate    float64
+       CrunchRunDetachDelay  time.Duration
+       ExecuteContainer      func(arvados.Container) int
+       CrashRunningContainer func(arvados.Container)
 
        sis          *StubInstanceSet
        id           cloud.InstanceID
@@ -190,7 +194,9 @@ type StubVM struct {
        initCommand  cloud.InitCommand
        providerType string
        SSHService   SSHService
-       running      map[string]bool
+       running      map[string]int64
+       killing      map[string]bool
+       lastPID      int64
        sync.Mutex
 }
 
@@ -239,21 +245,21 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                }
                for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
                        if stdinKV[name] == "" {
-                               fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
+                               fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
                                return 1
                        }
                }
                svm.Lock()
-               if svm.running == nil {
-                       svm.running = map[string]bool{}
-               }
-               svm.running[uuid] = true
+               svm.lastPID++
+               pid := svm.lastPID
+               svm.running[uuid] = pid
                svm.Unlock()
                time.Sleep(svm.CrunchRunDetachDelay)
                fmt.Fprintf(stderr, "starting %s\n", uuid)
                logger := svm.sis.logger.WithFields(logrus.Fields{
                        "Instance":      svm.id,
                        "ContainerUUID": uuid,
+                       "PID":           pid,
                })
                logger.Printf("[test] starting crunch-run stub")
                go func() {
@@ -263,37 +269,43 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                                logger.Print("[test] container not in queue")
                                return
                        }
+
+                       defer func() {
+                               if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil {
+                                       svm.CrashRunningContainer(ctr)
+                               }
+                       }()
+
                        if crashluck > svm.CrunchRunCrashRate/2 {
                                time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
                                ctr.State = arvados.ContainerStateRunning
-                               queue.Notify(ctr)
+                               if !queue.Notify(ctr) {
+                                       ctr, _ = queue.Get(uuid)
+                                       logger.Print("[test] erroring out because state=Running update was rejected")
+                                       return
+                               }
                        }
 
                        time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+
                        svm.Lock()
-                       _, running := svm.running[uuid]
-                       svm.Unlock()
-                       if !running {
+                       defer svm.Unlock()
+                       if svm.running[uuid] != pid {
                                logger.Print("[test] container was killed")
                                return
                        }
-                       if svm.ExecuteContainer != nil {
-                               ctr.ExitCode = svm.ExecuteContainer(ctr)
-                       }
-                       // TODO: Check whether the stub instance has
-                       // been destroyed, and if so, don't call
-                       // queue.Notify. Then "container finished
-                       // twice" can be classified as a bug.
+                       delete(svm.running, uuid)
+
                        if crashluck < svm.CrunchRunCrashRate {
-                               logger.Print("[test] crashing crunch-run stub")
+                               logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
                        } else {
+                               if svm.ExecuteContainer != nil {
+                                       ctr.ExitCode = svm.ExecuteContainer(ctr)
+                               }
+                               logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub")
                                ctr.State = arvados.ContainerStateComplete
-                               queue.Notify(ctr)
+                               go queue.Notify(ctr)
                        }
-                       logger.Print("[test] exiting crunch-run stub")
-                       svm.Lock()
-                       defer svm.Unlock()
-                       delete(svm.running, uuid)
                }()
                return 0
        }
@@ -303,17 +315,41 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                for uuid := range svm.running {
                        fmt.Fprintf(stdout, "%s\n", uuid)
                }
+               if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
+                       fmt.Fprintln(stdout, "broken")
+               }
                return 0
        }
        if strings.HasPrefix(command, "crunch-run --kill ") {
                svm.Lock()
-               defer svm.Unlock()
-               if svm.running[uuid] {
-                       delete(svm.running, uuid)
+               pid, running := svm.running[uuid]
+               if running && !svm.killing[uuid] {
+                       svm.killing[uuid] = true
+                       go func() {
+                               time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond)
+                               svm.Lock()
+                               defer svm.Unlock()
+                               if svm.running[uuid] == pid {
+                                       // Kill only if the running entry
+                                       // hasn't since been killed and
+                                       // replaced with a different one.
+                                       delete(svm.running, uuid)
+                               }
+                               delete(svm.killing, uuid)
+                       }()
+                       svm.Unlock()
+                       time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
+                       svm.Lock()
+                       _, running = svm.running[uuid]
+               }
+               svm.Unlock()
+               if running {
+                       fmt.Fprintf(stderr, "%s: container is running\n", uuid)
+                       return 1
                } else {
                        fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+                       return 0
                }
-               return 0
        }
        if command == "true" {
                return 0
index e81c2c091f1c37c7b52488b4d919bdb9a9fe4d79..014ab93bfe9c7289bcd99286379a3a26bbc38b18 100644 (file)
@@ -68,6 +68,8 @@ const (
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
        defaultTimeoutShutdown    = time.Second * 10
+       defaultTimeoutTERM        = time.Minute * 2
+       defaultTimeoutSignal      = time.Second * 5
 
        // Time after a quota error to try again anyway, even if no
        // instances have been shutdown.
@@ -105,6 +107,8 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
                timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               timeoutTERM:        duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
+               timeoutSignal:      duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
                installPublicKey:   installPublicKey,
                stop:               make(chan bool),
        }
@@ -136,6 +140,8 @@ type Pool struct {
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
+       timeoutTERM        time.Duration
+       timeoutSignal      time.Duration
        installPublicKey   ssh.PublicKey
 
        // private state
@@ -319,9 +325,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
        if !ok {
                return errors.New("requested instance does not exist")
        }
-       wkr.idleBehavior = idleBehavior
-       wkr.saveTags()
-       wkr.shutdownIfIdle()
+       wkr.setIdleBehavior(idleBehavior)
        return nil
 }
 
@@ -383,19 +387,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor
                probed:       now,
                busy:         now,
                updated:      now,
-               running:      make(map[string]struct{}),
-               starting:     make(map[string]struct{}),
+               running:      make(map[string]*remoteRunner),
+               starting:     make(map[string]*remoteRunner),
                probing:      make(chan struct{}, 1),
        }
        wp.workers[id] = wkr
        return wkr, true
 }
 
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
-       wp.exited[uuid] = t
-}
-
 // Shutdown shuts down a worker with the given type, or returns false
 // if all workers with the given type are busy.
 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
@@ -419,8 +418,12 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
 }
 
 // CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
 func (wp *Pool) CountWorkers() map[State]int {
        wp.setupOnce.Do(wp.setup)
+       wp.waitUntilLoaded()
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        r := map[State]int{}
@@ -482,53 +485,29 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Reason":        reason,
+       })
        if _, ok := wp.exited[uuid]; ok {
-               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               logger.Debug("clearing placeholder for exited crunch-run process")
                delete(wp.exited, uuid)
                return
        }
        for _, wkr := range wp.workers {
-               if _, ok := wkr.running[uuid]; ok {
-                       go wp.kill(wkr, uuid)
-                       return
+               rr := wkr.running[uuid]
+               if rr == nil {
+                       rr = wkr.starting[uuid]
                }
-       }
-       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "ContainerUUID": uuid,
-               "Instance":      wkr.instance.ID(),
-       })
-       logger.Debug("killing process")
-       cmd := "crunch-run --kill 15 " + uuid
-       if u := wkr.instance.RemoteUser(); u != "root" {
-               cmd = "sudo " + cmd
-       }
-       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
-       if err != nil {
-               logger.WithFields(logrus.Fields{
-                       "stderr": string(stderr),
-                       "stdout": string(stdout),
-                       "error":  err,
-               }).Warn("kill failed")
-               return
-       }
-       logger.Debug("killing process succeeded")
-       wp.mtx.Lock()
-       defer wp.mtx.Unlock()
-       if _, ok := wkr.running[uuid]; ok {
-               delete(wkr.running, uuid)
-               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
-                       wkr.state = StateIdle
+               if rr != nil {
+                       rr.Kill(reason)
+                       return
                }
-               wkr.updated = time.Now()
-               go wp.notify()
        }
+       logger.Debug("cannot kill: already disappeared")
 }
 
 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
@@ -712,6 +691,18 @@ func (wp *Pool) Instances() []InstanceView {
        return r
 }
 
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+       wkr, ok := wp.workers[id]
+       if !ok {
+               return errors.New("instance not found")
+       }
+       wkr.logger.WithField("Reason", reason).Info("shutting down")
+       wkr.shutdown()
+       return nil
+}
+
 func (wp *Pool) setup() {
        wp.creating = map[string]createCall{}
        wp.exited = map[string]time.Time{}
@@ -781,11 +772,12 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                })
                logger.Info("instance disappeared in cloud")
                delete(wp.workers, id)
-               go wkr.executor.Close()
+               go wkr.Close()
                notify = true
        }
 
        if !wp.loaded {
+               notify = true
                wp.loaded = true
                wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
        }
@@ -795,6 +787,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        }
 }
 
+func (wp *Pool) waitUntilLoaded() {
+       ch := wp.Subscribe()
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       for !wp.loaded {
+               wp.mtx.RUnlock()
+               <-ch
+               wp.mtx.RLock()
+       }
+}
+
 // Return a random string of n hexadecimal digits (n*4 random bits). n
 // must be even.
 func randomHex(n int) string {
diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go
new file mode 100644 (file)
index 0000000..c30ff9f
--- /dev/null
@@ -0,0 +1,154 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "syscall"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
+)
+
+// remoteRunner handles the starting and stopping of a crunch-run
+// process on a remote machine.
+type remoteRunner struct {
+       uuid          string
+       executor      Executor
+       arvClient     *arvados.Client
+       remoteUser    string
+       timeoutTERM   time.Duration
+       timeoutSignal time.Duration
+       onUnkillable  func(uuid string) // callback invoked when giving up on SIGTERM
+       onKilled      func(uuid string) // callback invoked when process exits after SIGTERM
+       logger        logrus.FieldLogger
+
+       stopping bool          // true if Stop() has been called
+       givenup  bool          // true if timeoutTERM has been reached
+       closed   chan struct{} // channel is closed if Close() has been called
+}
+
+// newRemoteRunner returns a new remoteRunner. Caller should ensure
+// Close() is called to release resources.
+func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+       rr := &remoteRunner{
+               uuid:          uuid,
+               executor:      wkr.executor,
+               arvClient:     wkr.wp.arvClient,
+               remoteUser:    wkr.instance.RemoteUser(),
+               timeoutTERM:   wkr.wp.timeoutTERM,
+               timeoutSignal: wkr.wp.timeoutSignal,
+               onUnkillable:  wkr.onUnkillable,
+               onKilled:      wkr.onKilled,
+               logger:        wkr.logger.WithField("ContainerUUID", uuid),
+               closed:        make(chan struct{}),
+       }
+       return rr
+}
+
+// Start a crunch-run process on the remote host.
+//
+// Start does not return any error encountered. The caller should
+// assume the remote process _might_ have started, at least until it
+// probes the worker and finds otherwise.
+func (rr *remoteRunner) Start() {
+       env := map[string]string{
+               "ARVADOS_API_HOST":  rr.arvClient.APIHost,
+               "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
+       }
+       if rr.arvClient.Insecure {
+               env["ARVADOS_API_HOST_INSECURE"] = "1"
+       }
+       envJSON, err := json.Marshal(env)
+       if err != nil {
+               panic(err)
+       }
+       stdin := bytes.NewBuffer(envJSON)
+       cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+       if rr.remoteUser != "root" {
+               cmd = "sudo " + cmd
+       }
+       stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
+       if err != nil {
+               rr.logger.WithField("stdout", string(stdout)).
+                       WithField("stderr", string(stderr)).
+                       WithError(err).
+                       Error("error starting crunch-run process")
+               return
+       }
+       rr.logger.Info("crunch-run process started")
+}
+
+// Close abandons the remote process (if any) and releases
+// resources. Close must not be called more than once.
+func (rr *remoteRunner) Close() {
+       close(rr.closed)
+}
+
+// Kill starts a background task to kill the remote process, first
+// trying SIGTERM until reaching timeoutTERM, then calling
+// onUnkillable().
+//
+// SIGKILL is not used. It would merely kill the crunch-run supervisor
+// and thereby make the docker container, arv-mount, etc. invisible to
+// us without actually stopping them.
+//
+// Once Kill has been called, calling it again has no effect.
+func (rr *remoteRunner) Kill(reason string) {
+       if rr.stopping {
+               return
+       }
+       rr.stopping = true
+       rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
+       go func() {
+               termDeadline := time.Now().Add(rr.timeoutTERM)
+               t := time.NewTicker(rr.timeoutSignal)
+               defer t.Stop()
+               for range t.C {
+                       switch {
+                       case rr.isClosed():
+                               return
+                       case time.Now().After(termDeadline):
+                               rr.logger.Debug("giving up")
+                               rr.givenup = true
+                               rr.onUnkillable(rr.uuid)
+                               return
+                       default:
+                               rr.kill(syscall.SIGTERM)
+                       }
+               }
+       }()
+}
+
+func (rr *remoteRunner) kill(sig syscall.Signal) {
+       logger := rr.logger.WithField("Signal", int(sig))
+       logger.Info("sending signal")
+       cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+       if rr.remoteUser != "root" {
+               cmd = "sudo " + cmd
+       }
+       stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
+       if err != nil {
+               logger.WithFields(logrus.Fields{
+                       "stderr": string(stderr),
+                       "stdout": string(stdout),
+                       "error":  err,
+               }).Info("kill attempt unsuccessful")
+               return
+       }
+       rr.onKilled(rr.uuid)
+}
+
+func (rr *remoteRunner) isClosed() bool {
+       select {
+       case <-rr.closed:
+               return true
+       default:
+               return false
+       }
+}
index 64e1f7797af8634be63502faea5faaaa8b30a5f9..49c5057b3842e49da945d40c3950f7c2185dfcc5 100644 (file)
@@ -5,8 +5,6 @@
 package worker
 
 import (
-       "bytes"
-       "encoding/json"
        "fmt"
        "strings"
        "sync"
@@ -87,62 +85,60 @@ type worker struct {
        busy         time.Time
        destroyed    time.Time
        lastUUID     string
-       running      map[string]struct{} // remember to update state idle<->running when this changes
-       starting     map[string]struct{} // remember to update state idle<->running when this changes
+       running      map[string]*remoteRunner // remember to update state idle<->running when this changes
+       starting     map[string]*remoteRunner // remember to update state idle<->running when this changes
        probing      chan struct{}
 }
 
+func (wkr *worker) onUnkillable(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       logger := wkr.logger.WithField("ContainerUUID", uuid)
+       if wkr.idleBehavior == IdleBehaviorHold {
+               logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+               return
+       }
+       logger.Warn("unkillable container, draining worker")
+       wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       wkr.closeRunner(uuid)
+       go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+       wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+       wkr.idleBehavior = idleBehavior
+       wkr.saveTags()
+       wkr.shutdownIfIdle()
+}
+
 // caller must have lock.
 func (wkr *worker) startContainer(ctr arvados.Container) {
        logger := wkr.logger.WithFields(logrus.Fields{
                "ContainerUUID": ctr.UUID,
                "Priority":      ctr.Priority,
        })
-       logger = logger.WithField("Instance", wkr.instance.ID())
        logger.Debug("starting container")
-       wkr.starting[ctr.UUID] = struct{}{}
+       rr := newRemoteRunner(ctr.UUID, wkr)
+       wkr.starting[ctr.UUID] = rr
        if wkr.state != StateRunning {
                wkr.state = StateRunning
                go wkr.wp.notify()
        }
        go func() {
-               env := map[string]string{
-                       "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
-                       "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
-               }
-               if wkr.wp.arvClient.Insecure {
-                       env["ARVADOS_API_HOST_INSECURE"] = "1"
-               }
-               envJSON, err := json.Marshal(env)
-               if err != nil {
-                       panic(err)
-               }
-               stdin := bytes.NewBuffer(envJSON)
-               cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
-               if u := wkr.instance.RemoteUser(); u != "root" {
-                       cmd = "sudo " + cmd
-               }
-               stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
+               rr.Start()
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
                wkr.updated = now
                wkr.busy = now
                delete(wkr.starting, ctr.UUID)
-               wkr.running[ctr.UUID] = struct{}{}
-               wkr.lastUUID = ctr.UUID
-               if err != nil {
-                       logger.WithField("stdout", string(stdout)).
-                               WithField("stderr", string(stderr)).
-                               WithError(err).
-                               Error("error starting crunch-run process")
-                       // Leave uuid in wkr.running, though: it's
-                       // possible the error was just a communication
-                       // failure and the process was in fact
-                       // started.  Wait for next probe to find out.
-                       return
-               }
-               logger.Info("crunch-run process started")
+               wkr.running[ctr.UUID] = rr
                wkr.lastUUID = ctr.UUID
        }()
 }
@@ -218,11 +214,16 @@ func (wkr *worker) probeAndUpdate() {
                        logger.Info("instance booted; will try probeRunning")
                }
        }
+       reportedBroken := false
        if booted || wkr.state == StateUnknown {
-               ctrUUIDs, ok = wkr.probeRunning()
+               ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
        }
        wkr.mtx.Lock()
        defer wkr.mtx.Unlock()
+       if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+               logger.Info("probe reported broken instance")
+               wkr.setIdleBehavior(IdleBehaviorDrain)
+       }
        if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
                if wkr.state == StateShutdown && wkr.updated.After(updated) {
                        // Skip the logging noise if shutdown was
@@ -274,31 +275,8 @@ func (wkr *worker) probeAndUpdate() {
                // advantage of the non-busy state, though.
                wkr.busy = updateTime
        }
-       changed := false
 
-       // Build a new "running" map. Set changed=true if it differs
-       // from the existing map (wkr.running) to ensure the scheduler
-       // gets notified below.
-       running := map[string]struct{}{}
-       for _, uuid := range ctrUUIDs {
-               running[uuid] = struct{}{}
-               if _, ok := wkr.running[uuid]; !ok {
-                       if _, ok := wkr.starting[uuid]; !ok {
-                               // We didn't start it -- it must have
-                               // been started by a previous
-                               // dispatcher process.
-                               logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
-                       }
-                       changed = true
-               }
-       }
-       for uuid := range wkr.running {
-               if _, ok := running[uuid]; !ok {
-                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
-                       wkr.wp.notifyExited(uuid, updateTime)
-                       changed = true
-               }
-       }
+       changed := wkr.updateRunning(ctrUUIDs)
 
        // Update state if this was the first successful boot-probe.
        if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
@@ -317,14 +295,13 @@ func (wkr *worker) probeAndUpdate() {
 
        // Log whenever a run-probe reveals crunch-run processes
        // appearing/disappearing before boot-probe succeeds.
-       if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+       if wkr.state == StateUnknown && changed {
                logger.WithFields(logrus.Fields{
-                       "RunningContainers": len(running),
+                       "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
                }).Info("crunch-run probe succeeded, but boot probe is still failing")
        }
 
-       wkr.running = running
        if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
                wkr.state = StateRunning
        } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
@@ -333,14 +310,14 @@ func (wkr *worker) probeAndUpdate() {
        wkr.updated = updateTime
        if booted && (initialState == StateUnknown || initialState == StateBooting) {
                logger.WithFields(logrus.Fields{
-                       "RunningContainers": len(running),
+                       "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
                }).Info("probes succeeded, instance is in service")
        }
        go wkr.wp.notify()
 }
 
-func (wkr *worker) probeRunning() (running []string, ok bool) {
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
        cmd := "crunch-run --list"
        if u := wkr.instance.RemoteUser(); u != "root" {
                cmd = "sudo " + cmd
@@ -352,13 +329,17 @@ func (wkr *worker) probeRunning() (running []string, ok bool) {
                        "stdout":  string(stdout),
                        "stderr":  string(stderr),
                }).WithError(err).Warn("probe failed")
-               return nil, false
+               return
        }
-       stdout = bytes.TrimRight(stdout, "\n")
-       if len(stdout) == 0 {
-               return nil, true
+       ok = true
+       for _, s := range strings.Split(string(stdout), "\n") {
+               if s == "broken" {
+                       reportsBroken = true
+               } else if s != "" {
+                       running = append(running, s)
+               }
        }
-       return strings.Split(string(stdout), "\n"), true
+       return
 }
 
 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
@@ -402,27 +383,52 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
        return true
 }
 
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
 // caller must have lock.
-func (wkr *worker) shutdownIfIdle() bool {
+func (wkr *worker) eligibleForShutdown() bool {
        if wkr.idleBehavior == IdleBehaviorHold {
-               // Never shut down.
                return false
        }
-       age := time.Since(wkr.busy)
-
-       old := age >= wkr.wp.timeoutIdle
        draining := wkr.idleBehavior == IdleBehaviorDrain
-       shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
-               (draining && wkr.state == StateBooting)
-       if !shouldShutdown {
+       switch wkr.state {
+       case StateBooting:
+               return draining
+       case StateIdle:
+               return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+       case StateRunning:
+               if !draining {
+                       return false
+               }
+               for _, rr := range wkr.running {
+                       if !rr.givenup {
+                               return false
+                       }
+               }
+               for _, rr := range wkr.starting {
+                       if !rr.givenup {
+                               return false
+                       }
+               }
+               // draining, and all remaining runners are just trying
+               // to force-kill their crunch-run procs
+               return true
+       default:
                return false
        }
+}
 
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+       if !wkr.eligibleForShutdown() {
+               return false
+       }
        wkr.logger.WithFields(logrus.Fields{
                "State":        wkr.state,
-               "IdleDuration": stats.Duration(age),
+               "IdleDuration": stats.Duration(time.Since(wkr.busy)),
                "IdleBehavior": wkr.idleBehavior,
-       }).Info("shutdown idle worker")
+       }).Info("shutdown worker")
        wkr.shutdown()
        return true
 }
@@ -468,3 +474,68 @@ func (wkr *worker) saveTags() {
                }()
        }
 }
+
+func (wkr *worker) Close() {
+       // This might take time, so do it after unlocking mtx.
+       defer wkr.executor.Close()
+
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       for uuid, rr := range wkr.running {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+       for uuid, rr := range wkr.starting {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+       alive := map[string]bool{}
+       for _, uuid := range ctrUUIDs {
+               alive[uuid] = true
+               if _, ok := wkr.running[uuid]; ok {
+                       // unchanged
+               } else if rr, ok := wkr.starting[uuid]; ok {
+                       wkr.running[uuid] = rr
+                       delete(wkr.starting, uuid)
+                       changed = true
+               } else {
+                       // We didn't start it -- it must have been
+                       // started by a previous dispatcher process.
+                       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+                       wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+                       changed = true
+               }
+       }
+       for uuid := range wkr.running {
+               if !alive[uuid] {
+                       wkr.closeRunner(uuid)
+                       changed = true
+               }
+       }
+       return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+       rr := wkr.running[uuid]
+       if rr == nil {
+               return
+       }
+       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+       delete(wkr.running, uuid)
+       rr.Close()
+
+       now := time.Now()
+       wkr.updated = now
+       wkr.wp.exited[uuid] = now
+       if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+               wkr.state = StateIdle
+       }
+}
index 3bc33b62c9fee896f107278a564b859d1448366e..15a2a894c5bceb89bdae4f6c5e4146d317dca083 100644 (file)
@@ -209,12 +209,17 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        busy:     ctime,
                        probed:   ctime,
                        updated:  ctime,
+                       running:  map[string]*remoteRunner{},
+                       starting: map[string]*remoteRunner{},
+                       probing:  make(chan struct{}, 1),
                }
                if trial.running > 0 {
-                       wkr.running = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+                       uuid := "zzzzz-dz642-abcdefghijklmno"
+                       wkr.running = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
                }
                if trial.starting > 0 {
-                       wkr.starting = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+                       uuid := "zzzzz-dz642-bcdefghijklmnop"
+                       wkr.starting = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
                }
                wkr.probeAndUpdate()
                c.Check(wkr.state, check.Equals, trial.expectState)
index d99af0eea15428054fd5adc16596ca89b1de7820..e853da943222aa2182b01f41d12ebb3cbec5193a 100644 (file)
@@ -11,6 +11,7 @@ import (
        "fmt"
        "io"
        "net/http"
+       "net/url"
        "os"
 
        "git.curoverse.com/arvados.git/lib/cmd"
@@ -26,11 +27,12 @@ type Handler interface {
        CheckHealth() error
 }
 
-type NewHandlerFunc func(context.Context, *arvados.Cluster, *arvados.NodeProfile) Handler
+type NewHandlerFunc func(_ context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, token string) Handler
 
 type command struct {
        newHandler NewHandlerFunc
        svcName    arvados.ServiceName
+       ctx        context.Context // enables tests to shutdown service; no public API yet
 }
 
 // Command returns a cmd.Handler that loads site config, calls
@@ -43,6 +45,7 @@ func Command(svcName arvados.ServiceName, newHandler NewHandlerFunc) cmd.Handler
        return &command{
                newHandler: newHandler,
                svcName:    svcName,
+               ctx:        context.Background(),
        }
 }
 
@@ -77,7 +80,8 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        log = ctxlog.New(stderr, cluster.Logging.Format, cluster.Logging.Level).WithFields(logrus.Fields{
                "PID": os.Getpid(),
        })
-       ctx := ctxlog.Context(context.Background(), log)
+       ctx := ctxlog.Context(c.ctx, log)
+
        profileName := *nodeProfile
        if profileName == "" {
                profileName = os.Getenv("ARVADOS_NODE_PROFILE")
@@ -91,7 +95,25 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
                return 1
        }
-       handler := c.newHandler(ctx, cluster, profile)
+
+       if cluster.SystemRootToken == "" {
+               log.Warn("SystemRootToken missing from cluster config, falling back to ARVADOS_API_TOKEN environment variable")
+               cluster.SystemRootToken = os.Getenv("ARVADOS_API_TOKEN")
+       }
+       if cluster.Services.Controller.ExternalURL.Host == "" {
+               log.Warn("Services.Controller.ExternalURL missing from cluster config, falling back to ARVADOS_API_HOST(_INSECURE) environment variables")
+               u, err := url.Parse("https://" + os.Getenv("ARVADOS_API_HOST"))
+               if err != nil {
+                       err = fmt.Errorf("ARVADOS_API_HOST: %s", err)
+                       return 1
+               }
+               cluster.Services.Controller.ExternalURL = arvados.URL(*u)
+               if i := os.Getenv("ARVADOS_API_HOST_INSECURE"); i != "" && i != "0" {
+                       cluster.TLS.Insecure = true
+               }
+       }
+
+       handler := c.newHandler(ctx, cluster, profile, cluster.SystemRootToken)
        if err = handler.CheckHealth(); err != nil {
                return 1
        }
@@ -112,6 +134,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.WithError(err).Errorf("error notifying init daemon")
        }
+       go func() {
+               <-ctx.Done()
+               srv.Close()
+       }()
        err = srv.Wait()
        if err != nil {
                return 1
diff --git a/lib/service/cmd_test.go b/lib/service/cmd_test.go
new file mode 100644 (file)
index 0000000..62960dc
--- /dev/null
@@ -0,0 +1,78 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+// package service provides a cmd.Handler that brings up a system service.
+package service
+
+import (
+       "bytes"
+       "context"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "os"
+       "testing"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (*Suite) TestCommand(c *check.C) {
+       cf, err := ioutil.TempFile("", "cmd_test.")
+       c.Assert(err, check.IsNil)
+       defer os.Remove(cf.Name())
+       defer cf.Close()
+       fmt.Fprintf(cf, "Clusters:\n zzzzz:\n  SystemRootToken: abcde\n  NodeProfiles: {\"*\": {\"arvados-controller\": {Listen: \":1234\"}}}")
+
+       healthCheck := make(chan bool, 1)
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, token string) Handler {
+               c.Check(ctx.Value("foo"), check.Equals, "bar")
+               c.Check(token, check.Equals, "abcde")
+               return &testHandler{ctx: ctx, healthCheck: healthCheck}
+       })
+       cmd.(*command).ctx = context.WithValue(ctx, "foo", "bar")
+
+       done := make(chan bool)
+       var stdin, stdout, stderr bytes.Buffer
+
+       go func() {
+               cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
+               close(done)
+       }()
+       select {
+       case <-healthCheck:
+       case <-done:
+               c.Error("command exited without health check")
+       }
+       cancel()
+       c.Check(stdout.String(), check.Equals, "")
+       c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
+}
+
+type testHandler struct {
+       ctx         context.Context
+       healthCheck chan bool
+}
+
+func (th *testHandler) ServeHTTP(http.ResponseWriter, *http.Request) {}
+func (th *testHandler) CheckHealth() error {
+       ctxlog.FromContext(th.ctx).Info("CheckHealth called")
+       select {
+       case th.healthCheck <- true:
+       default:
+       }
+       return nil
+}
diff --git a/lib/service/error.go b/lib/service/error.go
new file mode 100644 (file)
index 0000000..8955210
--- /dev/null
@@ -0,0 +1,38 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package service
+
+import (
+       "context"
+       "net/http"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "github.com/sirupsen/logrus"
+)
+
+// ErrorHandler returns a Handler that reports itself as unhealthy and
+// responds 500 to all requests.  ErrorHandler itself logs the given
+// error once, and the handler logs it again for each incoming
+// request.
+func ErrorHandler(ctx context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, err error) Handler {
+       logger := ctxlog.FromContext(ctx)
+       logger.WithError(err).Error("unhealthy service")
+       return errorHandler{err, logger}
+}
+
+type errorHandler struct {
+       err    error
+       logger logrus.FieldLogger
+}
+
+func (eh errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       eh.logger.WithError(eh.err).Error("unhealthy service")
+       http.Error(w, "", http.StatusInternalServerError)
+}
+
+func (eh errorHandler) CheckHealth() error {
+       return eh.err
+}
index c7e20e2a72947d2e74f147e6a6c0fd68d14254f8..60aeb1892b11e13c980c8c2a97e10da9fbc7a639 100644 (file)
@@ -33,8 +33,8 @@ Gem::Specification.new do |s|
   s.add_runtime_dependency 'arvados', '~> 1.3.0', '>= 1.3.0'
   # Our google-api-client dependency used to be < 0.9, but that could be
   # satisfied by the buggy 0.9.pre*.  https://dev.arvados.org/issues/9213
-  s.add_runtime_dependency 'cure-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
-  s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5'
+  s.add_runtime_dependency 'arvados-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
+  s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5.1'
   s.add_runtime_dependency 'json', '>= 1.7.7', '<3'
   s.add_runtime_dependency 'optimist', '~> 3.0'
   s.add_runtime_dependency 'andand', '~> 1.3', '>= 1.3.3'
index 834ca195fdda02d859eafcd4aa0cea5c70c1c359..95711762c9a421a94c3581b165d9dbd6522a99e6 100644 (file)
@@ -293,7 +293,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
         logger.exception("Error creating the Arvados CWL Executor")
         return 1
 
-    # Note that unless in debug mode, some stack traces related to user 
+    # Note that unless in debug mode, some stack traces related to user
     # workflow errors may be suppressed. See ArvadosJob.done().
     if arvargs.debug:
         logger.setLevel(logging.DEBUG)
index 03b4e07c76f5849a97ae85b9bd179e897ec8fc33..b194f3dfcaa0be1cf676ea771e008cbcc9b0b2c0 100644 (file)
@@ -123,6 +123,8 @@ class ArvadosContainer(JobBase):
                 "kind": "collection",
                 "portable_data_hash": pdh
             }
+            if pdh in self.pathmapper.pdh_to_uuid:
+                mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
             if len(sp) == 2:
                 if tp == "Directory":
                     path = sp[1]
@@ -329,8 +331,8 @@ class ArvadosContainer(JobBase):
             else:
                 processStatus = "permanentFail"
 
-            if processStatus == "permanentFail":
-                logc = arvados.collection.CollectionReader(container["log"],
+            if processStatus == "permanentFail" and record["log_uuid"]:
+                logc = arvados.collection.CollectionReader(record["log_uuid"],
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
@@ -353,8 +355,8 @@ class ArvadosContainer(JobBase):
             if container["output"]:
                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
         except WorkflowException as e:
-            # Only include a stack trace if in debug mode. 
-            # A stack trace may obfuscate more useful output about the workflow. 
+            # Only include a stack trace if in debug mode.
+            # A stack trace may obfuscate more useful output about the workflow.
             logger.error("%s unable to collect output from %s:\n%s",
                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
             processStatus = "permanentFail"
index 319e8a887114b88b55865ca673dbafb3e0b9a7dc..c35842616696f7dc00d047969ea74f27aad332df 100644 (file)
@@ -87,7 +87,7 @@ class RuntimeStatusLoggingHandler(logging.Handler):
                     )
             finally:
                 self.updatingRuntimeStatus = False
-            
+
 
 class ArvCwlExecutor(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
@@ -475,7 +475,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
         with final.open("cwl.output.json", "w") as f:
             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
-            f.write(res)           
+            f.write(res)
 
         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
 
index 3744b4a93afa40df10c17335310503500acd2432..252ca57d47bb30ad1834e7070b3cfdf1e0ffdbb1 100644 (file)
@@ -63,24 +63,27 @@ class CollectionCache(object):
             del self.collections[pdh]
             self.total -= v[1]
 
-    def get(self, pdh):
+    def get(self, locator):
         with self.lock:
-            if pdh not in self.collections:
-                m = pdh_size.match(pdh)
+            if locator not in self.collections:
+                m = pdh_size.match(locator)
                 if m:
                     self.cap_cache(int(m.group(2)) * 128)
-                logger.debug("Creating collection reader for %s", pdh)
-                cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
-                                                         keep_client=self.keep_client,
-                                                         num_retries=self.num_retries)
+                logger.debug("Creating collection reader for %s", locator)
+                try:
+                    cr = arvados.collection.CollectionReader(locator, api_client=self.api_client,
+                                                             keep_client=self.keep_client,
+                                                             num_retries=self.num_retries)
+                except arvados.errors.ApiError as ap:
+                    raise IOError(errno.ENOENT, "Could not access collection '%s': %s" % (locator, str(ap._get_reason())))
                 sz = len(cr.manifest_text()) * 128
-                self.collections[pdh] = (cr, sz)
+                self.collections[locator] = (cr, sz)
                 self.total += sz
             else:
-                cr, sz = self.collections[pdh]
+                cr, sz = self.collections[locator]
                 # bump it to the back
-                del self.collections[pdh]
-                self.collections[pdh] = (cr, sz)
+                del self.collections[locator]
+                self.collections[locator] = (cr, sz)
             return cr
 
 
@@ -94,9 +97,10 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
     def get_collection(self, path):
         sp = path.split("/", 1)
         p = sp[0]
-        if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
-            pdh = p[5:]
-            return (self.collection_cache.get(pdh), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
+        if p.startswith("keep:") and (arvados.util.keep_locator_pattern.match(p[5:]) or
+                                      arvados.util.collection_uuid_pattern.match(p[5:])):
+            locator = p[5:]
+            return (self.collection_cache.get(locator), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
         else:
             return (None, path)
 
@@ -261,9 +265,11 @@ class CollectionFetcher(DefaultFetcher):
             baseparts = basesp.path.split("/")
             urlparts = urlsp.path.split("/") if urlsp.path else []
 
-            pdh = baseparts.pop(0)
+            locator = baseparts.pop(0)
 
-            if basesp.scheme == "keep" and not arvados.util.keep_locator_pattern.match(pdh):
+            if (basesp.scheme == "keep" and
+                (not arvados.util.keep_locator_pattern.match(locator)) and
+                (not arvados.util.collection_uuid_pattern.match(locator))):
                 raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
 
             if urlsp.path.startswith("/"):
@@ -273,7 +279,7 @@ class CollectionFetcher(DefaultFetcher):
             if baseparts and urlsp.path:
                 baseparts.pop()
 
-            path = "/".join([pdh] + baseparts + urlparts)
+            path = "/".join([locator] + baseparts + urlparts)
             return urllib.parse.urlunsplit((basesp.scheme, "", path, "", urlsp.fragment))
 
         return super(CollectionFetcher, self).urljoin(base_url, url)
index e0445febdc9a0731314607417739747c8f0e632c..38135899dca7c66a10a135c85d2c0f8db43b0f2a 100644 (file)
@@ -58,6 +58,7 @@ class ArvPathMapper(PathMapper):
         self.name = name
         self.referenced_files = [r["location"] for r in referenced_files]
         self.single_collection = single_collection
+        self.pdh_to_uuid = {}
         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
 
     def visit(self, srcobj, uploadfiles):
@@ -67,6 +68,8 @@ class ArvPathMapper(PathMapper):
 
         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+            if arvados_cwl.util.collectionUUID in srcobj:
+                self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
 
         debug = logger.isEnabledFor(logging.DEBUG)
 
index e515ac2ce5e99f4ec75011b8ac51bfe2fc1bbff8..9385bde63c4aa60b55de22d7c8b87908039444e3 100644 (file)
@@ -8,6 +8,7 @@ from future.utils import  viewvalues, viewitems
 
 import os
 import sys
+import re
 import urllib.parse
 from functools import partial
 import logging
@@ -30,8 +31,10 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.utils import aslist
 from cwltool.builder import substitute
 from cwltool.pack import pack
+import schema_salad.validate as validate
 
 import arvados.collection
+from .util import collectionUUID
 import ruamel.yaml as yaml
 
 import arvados_cwl.arvdocker
@@ -87,6 +90,8 @@ def discover_secondary_files(inputs, job_order, discovered=None):
         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
             setSecondary(t, job_order[shortname(t["id"])], discovered)
 
+collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
+collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
 
 def upload_dependencies(arvrunner, name, document_loader,
                         workflowobj, uri, loadref_run,
@@ -136,14 +141,55 @@ def upload_dependencies(arvrunner, name, document_loader,
                   loadref, urljoin=document_loader.fetcher.urljoin)
 
     sc = []
-    def only_real(obj):
-        # Only interested in local files than need to be uploaded,
-        # don't include file literals, keep references, etc.
-        sp = obj.get("location", "").split(":")
-        if len(sp) > 1 and sp[0] in ("file", "http", "https"):
+    uuids = {}
+
+    def collect_uuids(obj):
+        loc = obj.get("location", "")
+        sp = loc.split(":")
+        if sp[0] == "keep":
+            # Collect collection uuids that need to be resolved to
+            # portable data hashes
+            gp = collection_uuid_pattern.match(loc)
+            if gp:
+                uuids[gp.groups()[0]] = obj
+            if collectionUUID in obj:
+                uuids[obj[collectionUUID]] = obj
+
+    def collect_uploads(obj):
+        loc = obj.get("location", "")
+        sp = loc.split(":")
+        if len(sp) < 1:
+            return
+        if sp[0] in ("file", "http", "https"):
+            # Record local files than need to be uploaded,
+            # don't include file literals, keep references, etc.
             sc.append(obj)
+        collect_uuids(obj)
 
-    visit_class(sc_result, ("File", "Directory"), only_real)
+    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+    visit_class(sc_result, ("File", "Directory"), collect_uploads)
+
+    # Resolve any collection uuids we found to portable data hashes
+    # and assign them to uuid_map
+    uuid_map = {}
+    fetch_uuids = list(uuids.keys())
+    while fetch_uuids:
+        # For a large number of fetch_uuids, API server may limit
+        # response size, so keep fetching from API server has nothing
+        # more to give us.
+        lookups = arvrunner.api.collections().list(
+            filters=[["uuid", "in", fetch_uuids]],
+            count="none",
+            select=["uuid", "portable_data_hash"]).execute(
+                num_retries=arvrunner.num_retries)
+
+        if not lookups["items"]:
+            break
+
+        for l in lookups["items"]:
+            uuid_map[l["uuid"]] = l["portable_data_hash"]
+
+        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
@@ -194,8 +240,37 @@ def upload_dependencies(arvrunner, name, document_loader,
                            single_collection=True)
 
     def setloc(p):
-        if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+        loc = p.get("location")
+        if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
+            return
+
+        if not loc:
+            return
+
+        if collectionUUID in p:
+            uuid = p[collectionUUID]
+            if uuid not in uuid_map:
+                raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+                    "Collection uuid %s not found" % uuid)
+            gp = collection_pdh_pattern.match(loc)
+            if gp and uuid_map[uuid] != gp.groups()[0]:
+                # This file entry has both collectionUUID and a PDH
+                # location. If the PDH doesn't match the one returned
+                # the API server, raise an error.
+                raise SourceLine(p, "location", validate.ValidationException).makeError(
+                    "Expected collection uuid %s to be %s but API server reported %s" % (
+                        uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
+        gp = collection_uuid_pattern.match(loc)
+        if not gp:
+            return
+        uuid = gp.groups()[0]
+        if uuid not in uuid_map:
+            raise SourceLine(p, "location", validate.ValidationException).makeError(
+                "Collection uuid %s not found" % uuid)
+        p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+        p[collectionUUID] = uuid
 
     visit_class(workflowobj, ("File", "Directory"), setloc)
     visit_class(discovered, ("File", "Directory"), setloc)
index 776fc6bc25dae06e232e2546cab501246d6cd6b3..85ae65ecf18c327aed6d9b3f2ddb5182dcc05b08 100644 (file)
@@ -5,6 +5,8 @@
 import datetime
 from arvados.errors import ApiError
 
+collectionUUID =  "http://arvados.org/cwl#collectionUUID"
+
 def get_intermediate_collection_info(workflow_step_name, current_container, intermediate_output_ttl):
         if workflow_step_name:
             name = "Intermediate collection for step %s" % (workflow_step_name)
@@ -30,5 +32,5 @@ def get_current_container(api, num_retries=0, logger=None):
             if logger:
                 logger.info("Getting current container: %s", e)
             raise e
-            
+
     return current_container
diff --git a/sdk/cwl/tests/submit_test_job_with_inconsistent_uuids.json b/sdk/cwl/tests/submit_test_job_with_inconsistent_uuids.json
new file mode 100644 (file)
index 0000000..233a9fc
--- /dev/null
@@ -0,0 +1,25 @@
+{
+    "x": {
+        "class": "File",
+        "path": "input/blorp.txt"
+    },
+    "y": {
+        "class": "Directory",
+        "location": "keep:99999999999999999999999999999998+99",
+        "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+        "listing": [{
+            "class": "File",
+            "location": "keep:99999999999999999999999999999997+99/file1.txt",
+            "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
+        }]
+    },
+    "z": {
+        "class": "Directory",
+        "basename": "anonymous",
+        "listing": [{
+            "basename": "renamed.txt",
+            "class": "File",
+            "location": "keep:99999999999999999999999999999998+99/file1.txt"
+        }]
+    }
+}
diff --git a/sdk/cwl/tests/submit_test_job_with_mismatched_uuids.json b/sdk/cwl/tests/submit_test_job_with_mismatched_uuids.json
new file mode 100644 (file)
index 0000000..72eb911
--- /dev/null
@@ -0,0 +1,26 @@
+{
+    "x": {
+        "class": "File",
+        "path": "input/blorp.txt"
+    },
+    "y": {
+        "class": "Directory",
+        "location": "keep:99999999999999999999999999999998+99",
+        "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+        "listing": [{
+            "class": "File",
+            "location": "keep:99999999999999999999999999999998+99/file1.txt",
+            "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
+        }]
+    },
+    "z": {
+        "class": "Directory",
+        "basename": "anonymous",
+        "listing": [{
+            "basename": "renamed.txt",
+            "class": "File",
+            "location": "keep:99999999999999999999999999999998+99/file1.txt",
+            "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
+        }]
+    }
+}
diff --git a/sdk/cwl/tests/submit_test_job_with_uuids.json b/sdk/cwl/tests/submit_test_job_with_uuids.json
new file mode 100644 (file)
index 0000000..82d3e2d
--- /dev/null
@@ -0,0 +1,23 @@
+{
+    "x": {
+        "class": "File",
+        "path": "input/blorp.txt"
+    },
+    "y": {
+        "class": "Directory",
+        "location": "keep:zzzzz-4zz18-zzzzzzzzzzzzzzz",
+        "listing": [{
+            "class": "File",
+            "location": "keep:zzzzz-4zz18-zzzzzzzzzzzzzzz/file1.txt"
+        }]
+    },
+    "z": {
+        "class": "Directory",
+        "basename": "anonymous",
+        "listing": [{
+            "basename": "renamed.txt",
+            "class": "File",
+            "location": "keep:zzzzz-4zz18-zzzzzzzzzzzzzzz/file1.txt"
+        }]
+    }
+}
index 1a57da3927a352e614f5a65ebb46887864ece07b..07d962bf9b55e2d33efab5b470b3146cab8c49f9 100644 (file)
@@ -80,7 +80,7 @@ class TestContainer(unittest.TestCase):
 
         return loadingContext, runtimeContext
 
-    # Helper function to set up the ArvCwlExecutor to use the containers api 
+    # Helper function to set up the ArvCwlExecutor to use the containers api
     # and test that the RuntimeStatusLoggingHandler is set up correctly
     def setup_and_test_container_executor_and_logging(self, gcc_mock) :
         api = mock.MagicMock()
@@ -96,7 +96,7 @@ class TestContainer(unittest.TestCase):
         handlerClasses = [h.__class__ for h in root_logger.handlers]
         self.assertTrue(arvados_cwl.RuntimeStatusLoggingHandler in handlerClasses)
         return runner
-        
+
     # The test passes no builder.resources
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@@ -527,9 +527,9 @@ class TestContainer(unittest.TestCase):
         # get_current_container is invoked when we call runtime_status_update
         # so try and log again!
         gcc_mock.side_effect = lambda *args: root_logger.error("Second Error")
-        try: 
+        try:
             root_logger.error("First Error")
-        except RuntimeError: 
+        except RuntimeError:
             self.fail("RuntimeStatusLoggingHandler should not be called recursively")
 
     @mock.patch("arvados_cwl.ArvCwlExecutor.runtime_status_update")
@@ -538,7 +538,7 @@ class TestContainer(unittest.TestCase):
     @mock.patch("arvados.collection.Collection")
     def test_child_failure(self, col, reader, gcc_mock, rts_mock):
         runner = self.setup_and_test_container_executor_and_logging(gcc_mock)
-        
+
         gcc_mock.return_value = {"uuid" : "zzzzz-dz642-zzzzzzzzzzzzzzz"}
         self.assertTrue(gcc_mock.called)
 
@@ -630,6 +630,7 @@ class TestContainer(unittest.TestCase):
             "p1": {
                 "class": "Directory",
                 "location": "keep:99999999999999999999999999999994+44",
+                "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
                 "listing": [
                     {
                         "class": "File",
@@ -660,7 +661,8 @@ class TestContainer(unittest.TestCase):
                     'mounts': {
                         "/keep/99999999999999999999999999999994+44": {
                             "kind": "collection",
-                            "portable_data_hash": "99999999999999999999999999999994+44"
+                            "portable_data_hash": "99999999999999999999999999999994+44",
+                            "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
                         },
                         '/tmp': {'kind': 'tmp',
                                  "capacity": 1073741824 },
index 39117d86e3ca976ffaa19e1e5596e37bf018b842..9535f6ba206bfadf267dda54bbad19fcecf0f961 100644 (file)
@@ -18,7 +18,16 @@ import mock
 import sys
 import unittest
 
-from io import BytesIO, StringIO
+from io import BytesIO
+
+# StringIO.StringIO and io.StringIO have different behavior write() is
+# called with both python2 (byte) strings and unicode strings
+# (specifically there's some logging in cwltool that causes trouble).
+# This isn't a problem on python3 because all string are unicode.
+if sys.version_info[0] < 3:
+    from StringIO import StringIO
+else:
+    from io import StringIO
 
 import arvados
 import arvados.collection
@@ -112,6 +121,11 @@ def stubs(func):
                 "portable_data_hash": "99999999999999999999999999999998+99",
                 "manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
             },
+            "99999999999999999999999999999997+99": {
+                "uuid": "",
+                "portable_data_hash": "99999999999999999999999999999997+99",
+                "manifest_text": ". 99999999999999999999999999999997+99 0:0:file1.txt"
+            },
             "99999999999999999999999999999994+99": {
                 "uuid": "",
                 "portable_data_hash": "99999999999999999999999999999994+99",
@@ -1425,6 +1439,96 @@ class TestSubmit(unittest.TestCase):
             stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
         self.assertEqual(exited, 1)
 
+    @mock.patch("arvados.collection.CollectionReader")
+    @stubs
+    def test_submit_uuid_inputs(self, stubs, collectionReader):
+        collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "file1.txt")
+        def list_side_effect(**kwargs):
+            m = mock.MagicMock()
+            if "count" in kwargs:
+                m.execute.return_value = {"items": [
+                    {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz", "portable_data_hash": "99999999999999999999999999999998+99"}
+                ]}
+            else:
+                m.execute.return_value = {"items": []}
+            return m
+        stubs.api.collections().list.side_effect = list_side_effect
+
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug",
+                "tests/wf/submit_wf.cwl", "tests/submit_test_job_with_uuids.json"],
+            stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['y']['basename'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+        expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['y']['http://arvados.org/cwl#collectionUUID'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+        expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['z']['listing'][0]['http://arvados.org/cwl#collectionUUID'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+
+        stubs.api.collections().list.assert_has_calls([
+            mock.call(count='none',
+                      filters=[['uuid', 'in', ['zzzzz-4zz18-zzzzzzzzzzzzzzz']]],
+                      select=['uuid', 'portable_data_hash'])])
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(stubs.capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+        self.assertEqual(exited, 0)
+
+    @stubs
+    def test_submit_mismatched_uuid_inputs(self, stubs):
+        def list_side_effect(**kwargs):
+            m = mock.MagicMock()
+            if "count" in kwargs:
+                m.execute.return_value = {"items": [
+                    {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz", "portable_data_hash": "99999999999999999999999999999997+99"}
+                ]}
+            else:
+                m.execute.return_value = {"items": []}
+            return m
+        stubs.api.collections().list.side_effect = list_side_effect
+
+        for infile in ("tests/submit_test_job_with_mismatched_uuids.json", "tests/submit_test_job_with_inconsistent_uuids.json"):
+            capture_stderr = StringIO()
+            cwltool_logger = logging.getLogger('cwltool')
+            stderr_logger = logging.StreamHandler(capture_stderr)
+            cwltool_logger.addHandler(stderr_logger)
+
+            try:
+                exited = arvados_cwl.main(
+                    ["--submit", "--no-wait", "--api=containers", "--debug",
+                        "tests/wf/submit_wf.cwl", infile],
+                    stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+                self.assertEqual(exited, 1)
+                self.assertRegexpMatches(
+                    capture_stderr.getvalue(),
+                    r"Expected collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz to be 99999999999999999999999999999998\+99 but API server reported 99999999999999999999999999999997\+99")
+            finally:
+                cwltool_logger.removeHandler(stderr_logger)
+
+    @mock.patch("arvados.collection.CollectionReader")
+    @stubs
+    def test_submit_unknown_uuid_inputs(self, stubs, collectionReader):
+        collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "file1.txt")
+        capture_stderr = StringIO()
+
+        cwltool_logger = logging.getLogger('cwltool')
+        stderr_logger = logging.StreamHandler(capture_stderr)
+        cwltool_logger.addHandler(stderr_logger)
+
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug",
+                "tests/wf/submit_wf.cwl", "tests/submit_test_job_with_uuids.json"],
+            stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+        try:
+            self.assertEqual(exited, 1)
+            self.assertRegexpMatches(
+                capture_stderr.getvalue(),
+                r"Collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz not found")
+        finally:
+            cwltool_logger.removeHandler(stderr_logger)
+
 
 class TestCreateTemplate(unittest.TestCase):
     existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
@@ -1609,22 +1713,24 @@ class TestCreateWorkflow(unittest.TestCase):
 
     @stubs
     def test_incompatible_api(self, stubs):
-        capture_stderr = io.StringIO()
+        capture_stderr = StringIO()
         acr_logger = logging.getLogger('arvados.cwl-runner')
         stderr_logger = logging.StreamHandler(capture_stderr)
         acr_logger.addHandler(stderr_logger)
 
-        exited = arvados_cwl.main(
-            ["--update-workflow", self.existing_workflow_uuid,
-             "--api=jobs",
-             "--debug",
-             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
-            sys.stderr, sys.stderr, api_client=stubs.api)
-        self.assertEqual(exited, 1)
-        self.assertRegexpMatches(
-            capture_stderr.getvalue(),
-            "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
-        acr_logger.removeHandler(stderr_logger)
+        try:
+            exited = arvados_cwl.main(
+                ["--update-workflow", self.existing_workflow_uuid,
+                 "--api=jobs",
+                 "--debug",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                sys.stderr, sys.stderr, api_client=stubs.api)
+            self.assertEqual(exited, 1)
+            self.assertRegexpMatches(
+                capture_stderr.getvalue(),
+                "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
+        finally:
+            acr_logger.removeHandler(stderr_logger)
 
     @stubs
     def test_update(self, stubs):
index 787e01ab8f7dc8be892e7c754bca4a29cba84b13..37ff4d818a2ee33d1930a4842927f6a4994e33b7 100644 (file)
@@ -69,6 +69,21 @@ var InsecureHTTPClient = &http.Client{
 var DefaultSecureClient = &http.Client{
        Timeout: 5 * time.Minute}
 
+// NewClientFromConfig creates a new Client that uses the endpoints in
+// the given cluster.
+//
+// AuthToken is left empty for the caller to populate.
+func NewClientFromConfig(cluster *Cluster) (*Client, error) {
+       ctrlURL := cluster.Services.Controller.ExternalURL
+       if ctrlURL.Host == "" {
+               return nil, fmt.Errorf("no host in config Services.Controller.ExternalURL: %v", ctrlURL)
+       }
+       return &Client{
+               APIHost:  fmt.Sprintf("%v", ctrlURL),
+               Insecure: cluster.TLS.Insecure,
+       }, nil
+}
+
 // NewClientFromEnv creates a new Client that uses the default HTTP
 // client with the API endpoint and credentials given by the
 // ARVADOS_API_* environment variables.
index f16f98a943cdbe2f0501a35d95cb3e45e9c9d5a9..2965d5ecb0dc8aa89da2354eea231464d9fa202f 100644 (file)
@@ -8,6 +8,7 @@ import (
        "encoding/json"
        "errors"
        "fmt"
+       "net/url"
        "os"
 
        "git.curoverse.com/arvados.git/sdk/go/config"
@@ -58,6 +59,8 @@ type RequestLimits struct {
 type Cluster struct {
        ClusterID          string `json:"-"`
        ManagementToken    string
+       SystemRootToken    string
+       Services           Services
        NodeProfiles       map[string]NodeProfile
        InstanceTypes      InstanceTypeMap
        CloudVMs           CloudVMs
@@ -67,8 +70,43 @@ type Cluster struct {
        PostgreSQL         PostgreSQL
        RequestLimits      RequestLimits
        Logging            Logging
+       TLS                TLS
 }
 
+type Services struct {
+       Controller    Service
+       DispatchCloud Service
+       Health        Service
+       Keepbalance   Service
+       Keepproxy     Service
+       Keepstore     Service
+       Keepweb       Service
+       Nodemanager   Service
+       RailsAPI      Service
+       Websocket     Service
+       Workbench     Service
+}
+
+type Service struct {
+       InternalURLs map[URL]ServiceInstance
+       ExternalURL  URL
+}
+
+// URL is a url.URL that is also usable as a JSON key/value.
+type URL url.URL
+
+// UnmarshalText implements encoding.TextUnmarshaler so URL can be
+// used as a JSON key/value.
+func (su *URL) UnmarshalText(text []byte) error {
+       u, err := url.Parse(string(text))
+       if err == nil {
+               *su = URL(*u)
+       }
+       return err
+}
+
+type ServiceInstance struct{}
+
 type Logging struct {
        Level  string
        Format string
@@ -122,6 +160,12 @@ type Dispatch struct {
 
        // Maximum total worker probes per second
        MaxProbesPerSecond int
+
+       // Time before repeating SIGTERM when killing a container
+       TimeoutSignal Duration
+
+       // Time to give up on SIGTERM and write off the worker
+       TimeoutTERM Duration
 }
 
 type CloudVMs struct {
@@ -148,6 +192,9 @@ type CloudVMs struct {
        // Time after shutdown to retry shutdown
        TimeoutShutdown Duration
 
+       // Maximum create/destroy-instance operations per second
+       MaxCloudOpsPerSecond int
+
        ImageID string
 
        Driver           string
@@ -300,3 +347,9 @@ type SystemServiceInstance struct {
        TLS      bool
        Insecure bool
 }
+
+type TLS struct {
+       Certificate string
+       Key         string
+       Insecure    bool
+}
index 2392fcde7bdc7475068bfac3452665daa2ef5a61..62936e71831fb1fa055b213fac470f2adeb5ea16 100644 (file)
@@ -6,6 +6,7 @@ package keepclient
 
 import (
        "encoding/json"
+       "errors"
        "fmt"
        "log"
        "os"
@@ -150,7 +151,12 @@ func (kc *KeepClient) discoverServices() error {
        }
        svcListCacheMtx.Unlock()
 
-       return kc.loadKeepServers(<-cacheEnt.latest)
+       select {
+       case <-time.After(time.Minute):
+               return errors.New("timed out while getting initial list of keep services")
+       case sl := <-cacheEnt.latest:
+               return kc.loadKeepServers(sl)
+       }
 }
 
 func (kc *KeepClient) RefreshServiceDiscovery() {
index da919309f4e829f227f3241eb7d41759087dde08..d4f04eb370659fac2ade716e302a301ea7494577 100644 (file)
@@ -29,7 +29,7 @@ Gem::Specification.new do |s|
   s.add_dependency('andand', '~> 1.3', '>= 1.3.3')
   # Our google-api-client dependency used to be < 0.9, but that could be
   # satisfied by the buggy 0.9.pre*.  https://dev.arvados.org/issues/9213
-  s.add_dependency('cure-google-api-client', '>= 0.7', '< 0.8.9')
+  s.add_dependency('arvados-google-api-client', '>= 0.7', '< 0.8.9')
   # work around undeclared dependency on i18n in some activesupport 3.x.x:
   s.add_dependency('i18n', '~> 0')
   s.add_dependency('json', '>= 1.7.7', '<3')
index 6b1bf795ed7eced1e872809358150c324e234239..cf0261585a9a77cdace3268918f9f4c5614bff5c 100644 (file)
@@ -122,11 +122,28 @@ class GroupsTest < ActionDispatch::IntegrationTest
     assert_includes coll_uuids, collections(:foo_collection_in_aproject).uuid
     assert_not_includes coll_uuids, collections(:expired_collection).uuid
   end
+end
+
+class NonTransactionalGroupsTest < ActionDispatch::IntegrationTest
+  # Transactional tests are disabled to be able to test the concurrent
+  # asynchronous permissions update feature.
+  # This is needed because nested transactions share the connection pool, so
+  # one thread is locked while trying to talk to the database, until the other
+  # one finishes.
+  self.use_transactional_fixtures = false
+
+  teardown do
+    # Explicitly reset the database after each test.
+    post '/database/reset', {}, auth(:admin)
+    assert_response :success
+  end
 
   test "create request with async=true defers permissions update" do
-    Rails.configuration.async_permissions_update_interval = 1 # seconds
+    Rails.configuration.async_permissions_update_interval = 1 # second
     name = "Random group #{rand(1000)}"
     assert_equal nil, Group.find_by_name(name)
+
+    # Trigger the asynchronous permission update by using async=true parameter.
     post "/arvados/v1/groups", {
       group: {
         name: name
@@ -134,8 +151,9 @@ class GroupsTest < ActionDispatch::IntegrationTest
       async: true
     }, auth(:active)
     assert_response 202
-    g = Group.find_by_name(name)
-    assert_not_nil g
+
+    # The group exists on the database, but it's not accessible yet.
+    assert_not_nil Group.find_by_name(name)
     get "/arvados/v1/groups", {
       filters: [["name", "=", name]].to_json,
       limit: 10
@@ -143,10 +161,8 @@ class GroupsTest < ActionDispatch::IntegrationTest
     assert_response 200
     assert_equal 0, json_response['items_available']
 
-    # Unblock the thread doing the permissions update
-    ActiveRecord::Base.clear_active_connections!
-
-    sleep(3)
+    # Wait a bit and try again.
+    sleep(1)
     get "/arvados/v1/groups", {
       filters: [["name", "=", name]].to_json,
       limit: 10
index b3c530e69013e91dfb7599c677347ecef3856d78..852ccb6ece3979385423f3ceb55fb437f164c6aa 100644 (file)
@@ -20,6 +20,7 @@ var (
        lockdir    = "/var/lock"
        lockprefix = "crunch-run-"
        locksuffix = ".lock"
+       brokenfile = "crunch-run-broken"
 )
 
 // procinfo is saved in each process's lockfile.
@@ -116,14 +117,21 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
 
        proc, err := os.FindProcess(pi.PID)
        if err != nil {
+               // FindProcess should have succeeded, even if the
+               // process does not exist.
                return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
        }
 
+       // Send the requested signal once, then send signal 0 a few
+       // times.  When proc.Signal() returns an error (process no
+       // longer exists), return success.  If that doesn't happen
+       // within 1 second, return an error.
        err = proc.Signal(signal)
        for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
                err = proc.Signal(syscall.Signal(0))
        }
        if err == nil {
+               // Reached deadline without a proc.Signal() error.
                return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
        }
        fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
@@ -139,7 +147,10 @@ func ListProcesses(stdout, stderr io.Writer) int {
                if info.IsDir() && path != walkdir {
                        return filepath.SkipDir
                }
-               if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
+               if name := info.Name(); name == brokenfile {
+                       fmt.Fprintln(stdout, "broken")
+                       return nil
+               } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
                        return nil
                }
                if info.Size() == 0 {
index 0576337aa13c280841187db3a7aea2dcf4af65c0..3925b0b7b1f810c9c451c7e756693ba5875bc252 100644 (file)
@@ -222,7 +222,14 @@ var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run
 
 func (runner *ContainerRunner) runBrokenNodeHook() {
        if *brokenNodeHook == "" {
-               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+               path := filepath.Join(lockdir, brokenfile)
+               runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
+               f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
+               if err != nil {
+                       runner.CrunchLog.Printf("Error writing %s: %s", path, err)
+                       return
+               }
+               f.Close()
        } else {
                runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
                // run killme script
index 17e5e145811aba3e587a66d07fb642ec07bef2d8..60729c019b1a1c508cacceb5b4e7d08e8d300bc5 100644 (file)
@@ -2049,7 +2049,7 @@ func (s *TestSuite) TestFullBrokenDocker2(c *C) {
 
        c.Check(api.CalledWith("container.state", "Queued"), NotNil)
        c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
 }
 
 func (s *TestSuite) TestFullBrokenDocker3(c *C) {
index f1e49f5afcffff32143b9033c5f83dddcd0c7c65..b37ef695bde6708cdb8fd2bc7d3117275920460e 100644 (file)
@@ -71,6 +71,7 @@ import ciso8601
 import collections
 import functools
 import arvados.keep
+from prometheus_client import Summary
 
 import Queue
 
@@ -351,6 +352,27 @@ class Operations(llfuse.Operations):
 
     """
 
+    fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
+    read_time = fuse_time.labels(op='read')
+    write_time = fuse_time.labels(op='write')
+    destroy_time = fuse_time.labels(op='destroy')
+    on_event_time = fuse_time.labels(op='on_event')
+    getattr_time = fuse_time.labels(op='getattr')
+    setattr_time = fuse_time.labels(op='setattr')
+    lookup_time = fuse_time.labels(op='lookup')
+    forget_time = fuse_time.labels(op='forget')
+    open_time = fuse_time.labels(op='open')
+    release_time = fuse_time.labels(op='release')
+    opendir_time = fuse_time.labels(op='opendir')
+    readdir_time = fuse_time.labels(op='readdir')
+    statfs_time = fuse_time.labels(op='statfs')
+    create_time = fuse_time.labels(op='create')
+    mkdir_time = fuse_time.labels(op='mkdir')
+    unlink_time = fuse_time.labels(op='unlink')
+    rmdir_time = fuse_time.labels(op='rmdir')
+    rename_time = fuse_time.labels(op='rename')
+    flush_time = fuse_time.labels(op='flush')
+
     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
         super(Operations, self).__init__()
 
@@ -391,6 +413,28 @@ class Operations(llfuse.Operations):
         # initializing to continue
         self.initlock.set()
 
+    def metric_samples(self):
+        return self.fuse_time.collect()[0].samples 
+
+    def metric_op_names(self):
+        ops = []
+        for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
+            if cur_op not in ops:
+                ops.append(cur_op)
+        return ops
+
+    def metric_value(self, opname, metric):
+        op_value = [sample.value for sample in self.metric_samples()
+                    if sample.name == metric and sample.labels['op'] == opname]
+        return op_value[0] if len(op_value) == 1 else None
+
+    def metric_sum_func(self, opname):
+        return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
+
+    def metric_count_func(self, opname):
+        return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
+
+    @destroy_time.time()
     @catch_exceptions
     def destroy(self):
         self._shutdown_started.set()
@@ -417,6 +461,7 @@ class Operations(llfuse.Operations):
             [["event_type", "in", ["create", "update", "delete"]]],
             self.on_event)
 
+    @on_event_time.time()
     @catch_exceptions
     def on_event(self, ev):
         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
@@ -447,6 +492,7 @@ class Operations(llfuse.Operations):
                     self.inodes.inode_cache.find_by_uuid(newowner)):
                 parent.child_event(ev)
 
+    @getattr_time.time()
     @catch_exceptions
     def getattr(self, inode, ctx=None):
         if inode not in self.inodes:
@@ -493,6 +539,7 @@ class Operations(llfuse.Operations):
 
         return entry
 
+    @setattr_time.time()
     @catch_exceptions
     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
         entry = self.getattr(inode)
@@ -516,6 +563,7 @@ class Operations(llfuse.Operations):
 
         return entry
 
+    @lookup_time.time()
     @catch_exceptions
     def lookup(self, parent_inode, name, ctx=None):
         name = unicode(name, self.inodes.encoding)
@@ -542,6 +590,7 @@ class Operations(llfuse.Operations):
                       parent_inode, name)
             raise llfuse.FUSEError(errno.ENOENT)
 
+    @forget_time.time()
     @catch_exceptions
     def forget(self, inodes):
         if self._shutdown_started.is_set():
@@ -552,6 +601,7 @@ class Operations(llfuse.Operations):
             if ent.dec_ref(nlookup) == 0 and ent.dead:
                 self.inodes.del_entry(ent)
 
+    @open_time.time()
     @catch_exceptions
     def open(self, inode, flags, ctx=None):
         if inode in self.inodes:
@@ -587,6 +637,7 @@ class Operations(llfuse.Operations):
 
         return fh
 
+    @read_time.time()
     @catch_exceptions
     def read(self, fh, off, size):
         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
@@ -604,6 +655,7 @@ class Operations(llfuse.Operations):
             self.read_counter.add(len(r))
         return r
 
+    @write_time.time()
     @catch_exceptions
     def write(self, fh, off, buf):
         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
@@ -624,6 +676,7 @@ class Operations(llfuse.Operations):
             self.write_counter.add(w)
         return w
 
+    @release_time.time()
     @catch_exceptions
     def release(self, fh):
         if fh in self._filehandles:
@@ -640,6 +693,7 @@ class Operations(llfuse.Operations):
     def releasedir(self, fh):
         self.release(fh)
 
+    @opendir_time.time()
     @catch_exceptions
     def opendir(self, inode, ctx=None):
         _logger.debug("arv-mount opendir: inode %i", inode)
@@ -664,6 +718,7 @@ class Operations(llfuse.Operations):
         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
         return fh
 
+    @readdir_time.time()
     @catch_exceptions
     def readdir(self, fh, off):
         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
@@ -679,6 +734,7 @@ class Operations(llfuse.Operations):
                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
             e += 1
 
+    @statfs_time.time()
     @catch_exceptions
     def statfs(self, ctx=None):
         st = llfuse.StatvfsData()
@@ -712,6 +768,7 @@ class Operations(llfuse.Operations):
 
         return p
 
+    @create_time.time()
     @catch_exceptions
     def create(self, inode_parent, name, mode, flags, ctx=None):
         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
@@ -728,6 +785,7 @@ class Operations(llfuse.Operations):
         f.inc_ref()
         return (fh, self.getattr(f.inode))
 
+    @mkdir_time.time()
     @catch_exceptions
     def mkdir(self, inode_parent, name, mode, ctx=None):
         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
@@ -741,18 +799,21 @@ class Operations(llfuse.Operations):
         d.inc_ref()
         return self.getattr(d.inode)
 
+    @unlink_time.time()
     @catch_exceptions
     def unlink(self, inode_parent, name, ctx=None):
         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.unlink(name)
 
+    @rmdir_time.time()
     @catch_exceptions
     def rmdir(self, inode_parent, name, ctx=None):
         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.rmdir(name)
 
+    @rename_time.time()
     @catch_exceptions
     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
@@ -760,6 +821,7 @@ class Operations(llfuse.Operations):
         dest = self._check_writable(inode_parent_new)
         dest.rename(name_old, name_new, src)
 
+    @flush_time.time()
     @catch_exceptions
     def flush(self, fh):
         if fh in self._filehandles:
index 47d89d8b9594a2befeff49a8ef2b13e89950b2ce..e99573752e1c0e69be8f2e07201530bf56a4024b 100644 (file)
@@ -4,64 +4,77 @@
 
 import sys
 import time
+from collections import namedtuple
 
-class Stat(object):
-    def __init__(self, prefix, interval,
-                 egr_name, ing_name,
-                 egr_func, ing_func):
+Stat = namedtuple("Stat", ['name', 'get'])
+
+class StatWriter(object):    
+    def __init__(self, prefix, interval, stats):
         self.prefix = prefix
         self.interval = interval
-        self.egr_name = egr_name
-        self.ing_name = ing_name
-        self.egress = egr_func
-        self.ingress = ing_func
-        self.egr_prev = self.egress()
-        self.ing_prev = self.ingress()
-
-    def update(self):
-        egr = self.egress()
-        ing = self.ingress()
+        self.stats = stats
+        self.previous_stats = []
+        self.update_previous_stats()
 
-        delta = " -- interval %.4f seconds %d %s %d %s" % (self.interval,
-                                                           egr - self.egr_prev,
-                                                           self.egr_name,
-                                                           ing - self.ing_prev,
-                                                           self.ing_name)
+    def update_previous_stats(self):
+        self.previous_stats = [stat.get() for stat in self.stats]
 
-        sys.stderr.write("crunchstat: %s %d %s %d %s%s\n" % (self.prefix,
-                                                             egr,
-                                                             self.egr_name,
-                                                             ing,
-                                                             self.ing_name,
-                                                             delta))
+    def update(self):
+        def append_by_type(string, name, value):
+            if type(value) is float:
+                string += " %.6f %s" % (value, name)
+            else:
+                string += " %s %s" % (str(value), name)
+            return string
 
-        self.egr_prev = egr
-        self.ing_prev = ing
+        out = "crunchstat: %s" % self.prefix
+        delta = "-- interval %.4f seconds" % self.interval
+        for i, stat in enumerate(self.stats):
+            value = stat.get()
+            diff = value - self.previous_stats[i]
+            delta = append_by_type(delta, stat.name, diff)
+            out = append_by_type(out, stat.name, value)
 
+        sys.stderr.write("%s %s\n" % (out, delta))
+        self.update_previous_stats()
 
 def statlogger(interval, keep, ops):
-    calls = Stat("keepcalls", interval, "put", "get",
-                 keep.put_counter.get,
-                 keep.get_counter.get)
-    net = Stat("net:keep0", interval, "tx", "rx",
-               keep.upload_counter.get,
-               keep.download_counter.get)
-    cache = Stat("keepcache", interval, "hit", "miss",
-               keep.hits_counter.get,
-               keep.misses_counter.get)
-    fuseops = Stat("fuseops", interval,"write", "read",
-                   ops.write_ops_counter.get,
-                   ops.read_ops_counter.get)
-    blk = Stat("blkio:0:0", interval, "write", "read",
-               ops.write_counter.get,
-               ops.read_counter.get)
+    calls = StatWriter("keepcalls", interval, [
+        Stat("put", keep.put_counter.get), 
+        Stat("get", keep.get_counter.get)
+    ])
+    net = StatWriter("net:keep0", interval, [
+        Stat("tx", keep.upload_counter.get),
+        Stat("rx", keep.download_counter.get)
+    ])
+    cache = StatWriter("keepcache", interval, [
+        Stat("hit", keep.hits_counter.get), 
+        Stat("miss", keep.misses_counter.get)
+    ])
+    fuseops = StatWriter("fuseops", interval, [
+        Stat("write", ops.write_ops_counter.get), 
+        Stat("read", ops.read_ops_counter.get)
+    ])
+    fusetimes = []
+    for cur_op in ops.metric_op_names():   
+        name = "fuseop:{0}".format(cur_op)
+        fusetimes.append(StatWriter(name, interval, [
+            Stat("count", ops.metric_count_func(cur_op)),
+            Stat("time", ops.metric_sum_func(cur_op))
+        ]))
+    blk = StatWriter("blkio:0:0", interval, [
+        Stat("write", ops.write_counter.get),
+        Stat("read", ops.read_counter.get)
+    ])
 
     while True:
         time.sleep(interval)
         calls.update()
         net.update()
         cache.update()
-        fuseops.update()
         blk.update()
+        fuseops.update()
+        for ftime in fusetimes:
+            ftime.update()
 
 
index 9b4b997cdc68dd14353e4621a77da7f30f7146a1..cbc9cb23edf5f36f0c6a83120d8c21cc0f582241 100644 (file)
@@ -43,7 +43,8 @@ setup(name='arvados_fuse',
         'llfuse >=1.2, <1.3.4',
         'python-daemon',
         'ciso8601 >= 2.0.0',
-        'setuptools'
+        'setuptools',
+        "prometheus_client"
         ],
       extras_require={
           ':python_version<"3"': ['pytz'],
index 66aebf80d4236d6b575a178641bd5eae8c14a1bc..8c443fd71afd3ddb2aee089df9bcb745b1c3315d 100755 (executable)
@@ -118,6 +118,23 @@ wait_for_arvbox() {
     fi
 }
 
+docker_run_dev() {
+    docker run \
+          "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
+           "--volume=$SSO_ROOT:/usr/src/sso:rw" \
+           "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
+           "--volume=$WORKBENCH2_ROOT:/usr/src/workbench2:rw" \
+           "--volume=$PG_DATA:/var/lib/postgresql:rw" \
+           "--volume=$VAR_DATA:/var/lib/arvados:rw" \
+           "--volume=$PASSENGER:/var/lib/passenger:rw" \
+           "--volume=$GEMS:/var/lib/gems:rw" \
+           "--volume=$PIPCACHE:/var/lib/pip:rw" \
+           "--volume=$NPMCACHE:/var/lib/npm:rw" \
+           "--volume=$GOSTUFF:/var/lib/gopath:rw" \
+           "--volume=$RLIBS:/var/lib/Rlibs:rw" \
+          "$@"
+}
+
 run() {
     CONFIG=$1
     TAG=$2
@@ -220,22 +237,10 @@ run() {
             mkdir -p $VAR_DATA/test
 
             if test "$need_setup" = 1 ; then
-                docker run \
+                docker_run_dev \
                        --detach \
                        --name=$ARVBOX_CONTAINER \
                        --privileged \
-                       "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
-                       "--volume=$SSO_ROOT:/usr/src/sso:rw" \
-                       "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
-                       "--volume=$WORKBENCH2_ROOT:/usr/src/workbench2:rw" \
-                       "--volume=$PG_DATA:/var/lib/postgresql:rw" \
-                       "--volume=$VAR_DATA:/var/lib/arvados:rw" \
-                       "--volume=$PASSENGER:/var/lib/passenger:rw" \
-                       "--volume=$GEMS:/var/lib/gems:rw" \
-                       "--volume=$PIPCACHE:/var/lib/pip:rw" \
-                       "--volume=$NPMCACHE:/var/lib/npm:rw" \
-                       "--volume=$GOSTUFF:/var/lib/gopath:rw" \
-                       "--volume=$RLIBS:/var/lib/Rlibs:rw" \
                       "--env=SVDIR=/etc/test-service" \
                        arvados/arvbox-dev$TAG
 
@@ -264,22 +269,10 @@ run() {
                    GEM_HOME=/var/lib/gems \
                    "$@"
         elif echo "$CONFIG" | grep 'dev$' ; then
-            docker run \
+            docker_run_dev \
                    --detach \
                    --name=$ARVBOX_CONTAINER \
                    --privileged \
-                   "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
-                   "--volume=$SSO_ROOT:/usr/src/sso:rw" \
-                   "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
-                   "--volume=$WORKBENCH2_ROOT:/usr/src/workbench2:rw" \
-                   "--volume=$PG_DATA:/var/lib/postgresql:rw" \
-                   "--volume=$VAR_DATA:/var/lib/arvados:rw" \
-                   "--volume=$PASSENGER:/var/lib/passenger:rw" \
-                   "--volume=$GEMS:/var/lib/gems:rw" \
-                   "--volume=$PIPCACHE:/var/lib/pip:rw" \
-                   "--volume=$NPMCACHE:/var/lib/npm:rw" \
-                   "--volume=$GOSTUFF:/var/lib/gopath:rw" \
-                   "--volume=$RLIBS:/var/lib/Rlibs:rw" \
                    $PUBLIC \
                    arvados/arvbox-dev$TAG
             updateconf
@@ -494,6 +487,46 @@ case "$subcmd" in
         fi
         ;;
 
+    install-root-cert)
+       set -x
+       sudo cp $VAR_DATA/root-cert.pem /usr/local/share/ca-certificates/${ARVBOX_CONTAINER}-testing-cert.crt
+       sudo update-ca-certificates
+       ;;
+
+    devenv)
+       set -x
+       if docker ps -a --filter "status=exited" | grep -E "${ARVBOX_CONTAINER}-devenv$" -q ; then
+           docker start ${ARVBOX_CONTAINER}-devenv
+       elif ! (docker ps -a --filter "status=running" | grep -E "${ARVBOX_CONTAINER}-devenv$" -q) ; then
+           docker_run_dev \
+                 --detach \
+                --name=${ARVBOX_CONTAINER}-devenv \
+                "--env=SVDIR=/etc/devenv-service" \
+                "--volume=$HOME:$HOME:rw" \
+                --volume=/tmp/.X11-unix:/tmp/.X11-unix:rw \
+                arvados/arvbox-dev$TAG
+       fi
+
+       exec docker exec --interactive --tty \
+            -e LINES=$(tput lines) \
+            -e COLUMNS=$(tput cols) \
+            -e TERM=$TERM \
+            -e "ARVBOX_HOME=$HOME" \
+            -e "DISPLAY=$DISPLAY" \
+            --workdir=$PWD \
+            ${ARVBOX_CONTAINER}-devenv \
+            /usr/local/lib/arvbox/devenv.sh "$@"
+       ;;
+
+    devenv-stop)
+       docker stop ${ARVBOX_CONTAINER}-devenv
+       ;;
+
+    devenv-reset)
+       docker stop ${ARVBOX_CONTAINER}-devenv
+       docker rm ${ARVBOX_CONTAINER}-devenv
+       ;;
+
     *)
         echo "Arvados-in-a-box                      http://arvados.org"
         echo
index 162edc927fe04a566422a53aca7735ee1bd31096..1949af435bd2de82c3c9e2398ce58fa873477035 100644 (file)
@@ -20,7 +20,7 @@ RUN apt-get update && \
     linkchecker python3-virtualenv python-virtualenv xvfb iceweasel \
     libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr \
     libsecret-1-dev r-base r-cran-testthat libxml2-dev pandoc \
-    python3-setuptools python3-pip && \
+    python3-setuptools python3-pip openjdk-8-jdk && \
     apt-get clean
 
 ENV RUBYVERSION_MINOR 2.3
@@ -40,7 +40,7 @@ ENV GEM_HOME /var/lib/gems
 ENV GEM_PATH /var/lib/gems
 ENV PATH $PATH:/var/lib/gems/bin
 
-ENV GOVERSION 1.10.1
+ENV GOVERSION 1.11.5
 
 # Install golang binary
 RUN curl -f http://storage.googleapis.com/golang/go${GOVERSION}.linux-amd64.tar.gz | \
@@ -79,7 +79,7 @@ RUN set -e && curl -L -f ${GDURL} | tar -C /usr/local/bin -xzf - geckodriver
 
 RUN pip install -U setuptools
 
-ENV NODEVERSION v6.11.4
+ENV NODEVERSION v8.15.1
 
 # Install nodejs binary
 RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
@@ -100,7 +100,7 @@ ADD crunch-setup.sh gitolite.rc \
     keep-setup.sh common.sh createusers.sh \
     logger runsu.sh waitforpostgres.sh \
     yml_override.py api-setup.sh \
-    go-setup.sh \
+    go-setup.sh devenv.sh \
     /usr/local/lib/arvbox/
 
 ADD runit /etc/runit
index e6e0397b99a1a3c7095f1b23e4d571edfcb915dd..bb0ff76fe8f065c1be45338f677cf0e7cd99b8ed 100644 (file)
@@ -13,3 +13,4 @@ RUN echo "development" > /var/lib/arvados/sso_rails_env
 RUN echo "development" > /var/lib/arvados/workbench_rails_env
 
 RUN mkdir /etc/test-service && ln -sf /var/lib/arvbox/service/postgres /etc/test-service
+RUN mkdir /etc/devenv-service
\ No newline at end of file
index bbd11f03416a9783904a48cb6823136ceb5c0686..36ff49db51b3b011dc6dea0346c530cb27cb6dd9 100644 (file)
@@ -9,6 +9,7 @@ export GEM_PATH=/var/lib/gems
 export npm_config_cache=/var/lib/npm
 export npm_config_cache_min=Infinity
 export R_LIBS=/var/lib/Rlibs
+export HOME=$(getent passwd arvbox | cut -d: -f6)
 
 if test -s /var/run/localip_override ; then
     localip=$(cat /var/run/localip_override)
index a4689f004aeebab31c71f0dbe35833ec99df7b89..e9721fd55d87c1e5a597f3a56b632310321bb8d1 100755 (executable)
@@ -13,9 +13,13 @@ if ! grep "^arvbox:" /etc/passwd >/dev/null 2>/dev/null ; then
           /var/lib/passenger /var/lib/gopath \
           /var/lib/pip /var/lib/npm
 
+    if test -z "$ARVBOX_HOME" ; then
+       ARVBOX_HOME=/var/lib/arvados
+    fi
+
     groupadd --gid $HOSTGID --non-unique arvbox
     groupadd --gid $HOSTGID --non-unique git
-    useradd --home-dir /var/lib/arvados \
+    useradd --home-dir $ARVBOX_HOME \
             --uid $HOSTUID --gid $HOSTGID \
             --non-unique \
             --groups docker \
@@ -36,6 +40,17 @@ if ! grep "^arvbox:" /etc/passwd >/dev/null 2>/dev/null ; then
     chown crunch:crunch -R /tmp/crunch0 /tmp/crunch1
 
     echo "arvbox    ALL=(crunch) NOPASSWD: ALL" >> /etc/sudoers
+
+    cat <<EOF > /etc/profile.d/paths.sh
+export PATH=/usr/local/bin:/usr/bin:/bin:/usr/local/go/bin:/var/lib/gems/bin:$(ls -d /usr/local/node-*)/bin
+export GEM_HOME=/var/lib/gems
+export GEM_PATH=/var/lib/gems
+export npm_config_cache=/var/lib/npm
+export npm_config_cache_min=Infinity
+export R_LIBS=/var/lib/Rlibs
+export GOPATH=/var/lib/gopath
+EOF
+
 fi
 
 if ! grep "^fuse:" /etc/group >/dev/null 2>/dev/null ; then
diff --git a/tools/arvbox/lib/arvbox/docker/devenv.sh b/tools/arvbox/lib/arvbox/docker/devenv.sh
new file mode 100755 (executable)
index 0000000..9ab3ac4
--- /dev/null
@@ -0,0 +1,12 @@
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+flock /var/lib/arvados/createusers.lock /usr/local/lib/arvbox/createusers.sh
+
+if [[ -n "$*" ]] ; then
+    exec su --preserve-environment arvbox -c "$*"
+else
+    exec su --login arvbox
+fi
deleted file mode 120000 (symlink)
index a388c8b67bf16bbb16601007540e58f1372ebc85..0000000000000000000000000000000000000000
+++ /dev/null
@@ -1 +0,0 @@
-/usr/local/lib/arvbox/runsu.sh
\ No newline at end of file
new file mode 100755 (executable)
index 0000000000000000000000000000000000000000..4014c5c8b040316c4850df4d788476854d06527c
--- /dev/null
@@ -0,0 +1,126 @@
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+exec 2>&1
+set -ex -o pipefail
+
+. /usr/local/lib/arvbox/common.sh
+
+cat <<EOF >/var/lib/arvados/nginx.conf
+worker_processes auto;
+pid /var/lib/arvados/nginx.pid;
+
+error_log stderr;
+daemon off;
+user arvbox;
+
+events {
+       worker_connections 64;
+}
+
+http {
+     access_log off;
+     include /etc/nginx/mime.types;
+     default_type application/octet-stream;
+     server {
+            listen ${services[doc]} default_server;
+            listen [::]:${services[doc]} default_server;
+            root /usr/src/arvados/doc/.site;
+            index index.html;
+            server_name _;
+     }
+
+  server {
+    listen 80 default_server;
+    server_name _;
+    return 301 https://\$host\$request_uri;
+  }
+
+  upstream controller {
+    server localhost:${services[controller]};
+  }
+  server {
+    listen *:${services[controller-ssl]} ssl default_server;
+    server_name controller;
+    ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+    ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+    location  / {
+      proxy_pass http://controller;
+      proxy_set_header Host \$http_host;
+      proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+      proxy_set_header X-Forwarded-Proto https;
+      proxy_redirect off;
+    }
+  }
+
+upstream arvados-ws {
+  server localhost:${services[websockets]};
+}
+server {
+  listen *:${services[websockets-ssl]} ssl default_server;
+  server_name           websockets;
+
+  proxy_connect_timeout 90s;
+  proxy_read_timeout    300s;
+
+  ssl                   on;
+  ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+  ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+
+  location / {
+    proxy_pass          http://arvados-ws;
+    proxy_set_header    Upgrade         \$http_upgrade;
+    proxy_set_header    Connection      "upgrade";
+    proxy_set_header Host \$http_host;
+    proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+  }
+}
+
+  upstream workbench2 {
+    server localhost:${services[workbench2]};
+  }
+  server {
+    listen *:${services[workbench2-ssl]} ssl default_server;
+    server_name workbench2;
+    ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+    ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+    location  / {
+      proxy_pass http://workbench2;
+      proxy_set_header Host \$http_host;
+      proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+      proxy_set_header X-Forwarded-Proto https;
+      proxy_redirect off;
+    }
+    location  /sockjs-node {
+      proxy_pass http://workbench2;
+      proxy_set_header    Upgrade         \$http_upgrade;
+      proxy_set_header    Connection      "upgrade";
+      proxy_set_header Host \$http_host;
+      proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+    }
+  }
+
+  upstream keep-web {
+    server localhost:${services[keep-web]};
+  }
+  server {
+    listen *:${services[keep-web-ssl]} ssl default_server;
+    server_name keep-web;
+    ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+    ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+    location  / {
+      proxy_pass http://keep-web;
+      proxy_set_header Host \$http_host;
+      proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+      proxy_set_header X-Forwarded-Proto https;
+      proxy_redirect off;
+    }
+  }
+
+}
+
+EOF
+
+exec nginx -c /var/lib/arvados/nginx.conf
diff --git a/tools/arvbox/lib/arvbox/docker/service/nginx/run-service b/tools/arvbox/lib/arvbox/docker/service/nginx/run-service
deleted file mode 100755 (executable)
index cf72ed2..0000000
+++ /dev/null
@@ -1,119 +0,0 @@
-#!/bin/bash
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-exec 2>&1
-set -ex -o pipefail
-
-. /usr/local/lib/arvbox/common.sh
-
-cat <<EOF >/var/lib/arvados/nginx.conf
-worker_processes auto;
-pid /var/lib/arvados/nginx.pid;
-
-error_log stderr;
-daemon off;
-
-events {
-       worker_connections 64;
-}
-
-http {
-     access_log off;
-     include /etc/nginx/mime.types;
-     default_type application/octet-stream;
-     server {
-            listen ${services[doc]} default_server;
-            listen [::]:${services[doc]} default_server;
-            root /usr/src/arvados/doc/.site;
-            index index.html;
-            server_name _;
-     }
-
-  upstream controller {
-    server localhost:${services[controller]};
-  }
-  server {
-    listen *:${services[controller-ssl]} ssl default_server;
-    server_name controller;
-    ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
-    ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
-    location  / {
-      proxy_pass http://controller;
-      proxy_set_header Host \$http_host;
-      proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
-      proxy_set_header X-Forwarded-Proto https;
-      proxy_redirect off;
-    }
-  }
-
-upstream arvados-ws {
-  server localhost:${services[websockets]};
-}
-server {
-  listen *:${services[websockets-ssl]} ssl default_server;
-  server_name           websockets;
-
-  proxy_connect_timeout 90s;
-  proxy_read_timeout    300s;
-
-  ssl                   on;
-  ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
-  ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
-
-  location / {
-    proxy_pass          http://arvados-ws;
-    proxy_set_header    Upgrade         \$http_upgrade;
-    proxy_set_header    Connection      "upgrade";
-    proxy_set_header Host \$http_host;
-    proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
-  }
-}
-
-  upstream workbench2 {
-    server localhost:${services[workbench2]};
-  }
-  server {
-    listen *:${services[workbench2-ssl]} ssl default_server;
-    server_name workbench2;
-    ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
-    ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
-    location  / {
-      proxy_pass http://workbench2;
-      proxy_set_header Host \$http_host;
-      proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
-      proxy_set_header X-Forwarded-Proto https;
-      proxy_redirect off;
-    }
-    location  /sockjs-node {
-      proxy_pass http://workbench2;
-      proxy_set_header    Upgrade         \$http_upgrade;
-      proxy_set_header    Connection      "upgrade";
-      proxy_set_header Host \$http_host;
-      proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
-    }
-  }
-
-  upstream keep-web {
-    server localhost:${services[keep-web]};
-  }
-  server {
-    listen *:${services[keep-web-ssl]} ssl default_server;
-    server_name keep-web;
-    ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
-    ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
-    location  / {
-      proxy_pass http://keep-web;
-      proxy_set_header Host \$http_host;
-      proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
-      proxy_set_header X-Forwarded-Proto https;
-      proxy_redirect off;
-    }
-  }
-
-}
-
-EOF
-
-exec nginx -c /var/lib/arvados/nginx.conf
index 71bf38357b885952fd9c327317a458a81ab48984..aadc775823caf136c7f7094a0d2b55fcb50f4478 100644 (file)
@@ -4,6 +4,7 @@
 
 import argparse
 import gzip
+from io import open
 import logging
 import sys
 
@@ -41,6 +42,31 @@ class ArgumentParser(argparse.ArgumentParser):
             help='Log more information (once for progress, twice for debug)')
 
 
+class UTF8Decode(object):
+    '''Wrap a file-like iterable to decode UTF-8 bytes into a strings
+    '''
+    def __init__(self, fh):
+        self.fh = fh
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        return next(self.fh).decode('utf-8')
+
+    next = __next__
+
+    def close(self):
+        # mimic Gzip behavior and don't close underlying object
+        pass
+
+
 class Command(object):
     def __init__(self, args):
         self.args = args
@@ -57,9 +83,9 @@ class Command(object):
             self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
         elif self.args.log_file:
             if self.args.log_file.endswith('.gz'):
-                fh = gzip.open(self.args.log_file)
+                fh = UTF8Decode(gzip.open(self.args.log_file))
             else:
-                fh = open(self.args.log_file)
+                fh = open(self.args.log_file, mode = 'r', encoding = 'utf-8')
             self.summer = summarizer.Summarizer(fh, **kwargs)
         else:
             self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
index 1314e9df35612817e260e6644212ef2e8a387bc3..6df440a14e37f87f8fcea5cac7c57ca1269915b4 100644 (file)
@@ -18,7 +18,7 @@ class DygraphsChart(crunchstat_summary.webchart.WebChart):
             'data': self._collate_data(tasks, stat),
             'options': {
                 'connectSeparatedPoints': True,
-                'labels': ['elapsed']+[uuid for uuid, _ in tasks.iteritems()],
+                'labels': ['elapsed']+[uuid for uuid, _ in tasks.items()],
                 'title': '{}: {} {}'.format(label, stat[0], stat[1]),
             },
         }
@@ -26,7 +26,7 @@ class DygraphsChart(crunchstat_summary.webchart.WebChart):
     def _collate_data(self, tasks, stat):
         data = []
         nulls = []
-        for uuid, task in tasks.iteritems():
+        for uuid, task in tasks.items():
             for pt in task.series[stat]:
                 data.append([pt[0].total_seconds()] + nulls + [pt[1]])
             nulls.append(None)
index 311c006c07d882a40ee5af8eaae651ba1e3c7145..8ccdbc2fcf04e45ca3ab3ec6e2270933d050ea1c 100644 (file)
@@ -2,11 +2,9 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-from __future__ import print_function
-
 import arvados
 import itertools
-import Queue
+import queue
 import threading
 
 from crunchstat_summary import logger
@@ -87,19 +85,21 @@ class LiveLogReader(object):
             self._queue.put(self.EOF)
 
     def __iter__(self):
-        self._queue = Queue.Queue()
+        self._queue = queue.Queue()
         self._thread = threading.Thread(target=self._get_all_pages)
         self._thread.daemon = True
         self._thread.start()
         return self
 
-    def next(self):
+    def __next__(self):
         line = self._queue.get()
         if line is self.EOF:
             self._thread.join()
             raise StopIteration
         return line
 
+    next = __next__ # for Python 2
+
     def __enter__(self):
         return self
 
index b2f6f1bb700b6d5d2a04f0212c699eb1ace15435..bf905a394606a4e9a1465979a575bf07e85e0657 100644 (file)
@@ -2,8 +2,6 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-from __future__ import print_function
-
 import arvados
 import collections
 import crunchstat_summary.dygraphs
@@ -24,7 +22,7 @@ from crunchstat_summary import logger
 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
 # that have amounts like 7.5 GiB according to the kernel.)
 AVAILABLE_RAM_RATIO = 0.95
-
+MB=2**20
 
 # Workaround datetime.datetime.strptime() thread-safety bug by calling
 # it once before starting threads.  https://bugs.python.org/issue7980
@@ -209,7 +207,7 @@ class Summarizer(object):
                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
                 if 'tx' in stats or 'rx' in stats:
                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
-                for stat, val in stats.iteritems():
+                for stat, val in stats.items():
                     if group == 'interval':
                         if stat == 'seconds':
                             this_interval_s = val
@@ -236,9 +234,9 @@ class Summarizer(object):
 
         self.job_tot = collections.defaultdict(
             functools.partial(collections.defaultdict, int))
-        for task_id, task_stat in self.task_stats.iteritems():
-            for category, stat_last in task_stat.iteritems():
-                for stat, val in stat_last.iteritems():
+        for task_id, task_stat in self.task_stats.items():
+            for category, stat_last in task_stat.items():
+                for stat, val in stat_last.items():
                     if stat in ['cpus', 'cache', 'swap', 'rss']:
                         # meaningless stats like 16 cpu cores x 5 tasks = 80
                         continue
@@ -273,8 +271,8 @@ class Summarizer(object):
 
     def _text_report_gen(self):
         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
-        for category, stat_max in sorted(self.stats_max.iteritems()):
-            for stat, val in sorted(stat_max.iteritems()):
+        for category, stat_max in sorted(self.stats_max.items()):
+            for stat, val in sorted(stat_max.items()):
                 if stat.endswith('__rate'):
                     continue
                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
@@ -292,7 +290,7 @@ class Summarizer(object):
                  self.stats_max['cpu']['user+sys__rate'],
                  lambda x: x * 100),
                 ('Overall CPU usage: {}%',
-                 self.job_tot['cpu']['user+sys'] /
+                 float(self.job_tot['cpu']['user+sys']) /
                  self.job_tot['time']['elapsed']
                  if self.job_tot['time']['elapsed'] > 0 else 0,
                  lambda x: x * 100),
@@ -325,6 +323,7 @@ class Summarizer(object):
             yield "# "+format_string.format(self._format(val))
 
     def _recommend_gen(self):
+        # TODO recommend fixing job granularity if elapsed time is too short
         return itertools.chain(
             self._recommend_cpu(),
             self._recommend_ram(),
@@ -335,21 +334,27 @@ class Summarizer(object):
 
         constraint_key = self._map_runtime_constraint('vcpus')
         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
-        if cpu_max_rate == float('-Inf'):
+        if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
             logger.warning('%s: no CPU usage data', self.label)
             return
+        # TODO Don't necessarily want to recommend on isolated max peak
+        # take average CPU usage into account as well or % time at max
         used_cores = max(1, int(math.ceil(cpu_max_rate)))
         asked_cores = self.existing_constraints.get(constraint_key)
-        if asked_cores is None or used_cores < asked_cores:
+        if asked_cores is None:
+            asked_cores = 1
+        # TODO: This should be more nuanced in cases where max >> avg
+        if used_cores < asked_cores:
             yield (
                 '#!! {} max CPU usage was {}% -- '
-                'try runtime_constraints "{}":{}'
+                'try reducing runtime_constraints to "{}":{}'
             ).format(
                 self.label,
-                int(math.ceil(cpu_max_rate*100)),
+                math.ceil(cpu_max_rate*100),
                 constraint_key,
                 int(used_cores))
 
+    # FIXME: This needs to be updated to account for current nodemanager algorithms
     def _recommend_ram(self):
         """Recommend an economical RAM constraint for this job.
 
@@ -389,20 +394,20 @@ class Summarizer(object):
         if used_bytes == float('-Inf'):
             logger.warning('%s: no memory usage data', self.label)
             return
-        used_mib = math.ceil(float(used_bytes) / 1048576)
+        used_mib = math.ceil(float(used_bytes) / MB)
         asked_mib = self.existing_constraints.get(constraint_key)
 
         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
-        if asked_mib is None or (
-                math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
+        if used_mib > 0 and (asked_mib is None or (
+                math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
             yield (
                 '#!! {} max RSS was {} MiB -- '
-                'try runtime_constraints "{}":{}'
+                'try reducing runtime_constraints to "{}":{}'
             ).format(
                 self.label,
                 int(used_mib),
                 constraint_key,
-                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
+                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
 
     def _recommend_keep_cache(self):
         """Recommend increasing keep cache if utilization < 80%"""
@@ -411,17 +416,18 @@ class Summarizer(object):
             return
         utilization = (float(self.job_tot['blkio:0:0']['read']) /
                        float(self.job_tot['net:keep0']['rx']))
-        asked_mib = self.existing_constraints.get(constraint_key, 256)
+        # FIXME: the default on this get won't work correctly
+        asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
 
         if utilization < 0.8:
             yield (
                 '#!! {} Keep cache utilization was {:.2f}% -- '
-                'try runtime_constraints "{}":{} (or more)'
+                'try doubling runtime_constraints to "{}":{} (or more)'
             ).format(
                 self.label,
                 utilization * 100.0,
                 constraint_key,
-                asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
+                math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
 
 
     def _format(self, val):
@@ -513,7 +519,7 @@ class ProcessSummarizer(Summarizer):
 
 
 class JobSummarizer(ProcessSummarizer):
-    runtime_constraint_mem_unit = 1048576
+    runtime_constraint_mem_unit = MB
     map_runtime_constraint = {
         'keep_cache_ram': 'keep_cache_mb_per_task',
         'ram': 'min_ram_mb_per_node',
@@ -539,7 +545,7 @@ class MultiSummarizer(object):
 
     def run(self):
         threads = []
-        for child in self.children.itervalues():
+        for child in self.children.values():
             self.throttle.acquire()
             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
             t.daemon = True
@@ -551,7 +557,7 @@ class MultiSummarizer(object):
     def text_report(self):
         txt = ''
         d = self._descendants()
-        for child in d.itervalues():
+        for child in d.values():
             if len(d) > 1:
                 txt += '### Summary for {} ({})\n'.format(
                     child.label, child.process['uuid'])
@@ -566,7 +572,7 @@ class MultiSummarizer(object):
         MultiSummarizers) are omitted.
         """
         d = collections.OrderedDict()
-        for key, child in self.children.iteritems():
+        for key, child in self.children.items():
             if isinstance(child, Summarizer):
                 d[key] = child
             if isinstance(child, MultiSummarizer):
@@ -574,7 +580,7 @@ class MultiSummarizer(object):
         return d
 
     def html_report(self):
-        return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html()
+        return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
 
 
 class JobTreeSummarizer(MultiSummarizer):
@@ -588,7 +594,7 @@ class JobTreeSummarizer(MultiSummarizer):
             preloaded = {}
             for j in arv.jobs().index(
                     limit=len(job['components']),
-                    filters=[['uuid','in',job['components'].values()]]).execute()['items']:
+                    filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
                 preloaded[j['uuid']] = j
             for cname in sorted(job['components'].keys()):
                 child_uuid = job['components'][cname]
@@ -605,7 +611,7 @@ class JobTreeSummarizer(MultiSummarizer):
 class PipelineSummarizer(MultiSummarizer):
     def __init__(self, instance, **kwargs):
         children = collections.OrderedDict()
-        for cname, component in instance['components'].iteritems():
+        for cname, component in instance['components'].items():
             if 'job' not in component:
                 logger.warning(
                     "%s: skipping component with no job assigned", cname)
@@ -663,7 +669,7 @@ class ContainerTreeSummarizer(MultiSummarizer):
                         cr['name'] = cr.get('name') or cr['uuid']
                         todo.append(cr)
         sorted_children = collections.OrderedDict()
-        for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
+        for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
             sorted_children[uuid] = children[uuid]
         super(ContainerTreeSummarizer, self).__init__(
             children=sorted_children,
index 9d18883ce2506d71abe03e08abde2fee28006343..cf0c1e67aa1ffdcf7853b2b1271bb2f03b16bae2 100644 (file)
@@ -2,7 +2,11 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-import cgi
+try:
+    from html import escape
+except ImportError:
+    from cgi import escape
+
 import json
 import pkg_resources
 
@@ -27,13 +31,13 @@ class WebChart(object):
         <script type="text/javascript">{}</script>
         {}
         </head><body></body></html>
-        '''.format(cgi.escape(self.label),
+        '''.format(escape(self.label),
                    self.JSLIB, self.js(), self.headHTML())
 
     def js(self):
         return 'var chartdata = {};\n{}'.format(
             json.dumps(self.sections()),
-            '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa) for jsa in self.JSASSETS]))
+            '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa).decode('utf-8') for jsa in self.JSASSETS]))
 
     def sections(self):
         return [
index 98194619495816bfa4a612710ad1506dcbcab114..0691e4f1ef4ea7e1604a0b7b73787f25b7dd7e58 100644 (file)
@@ -14,11 +14,9 @@ time elapsed 10      -       10
 # Number of tasks: 1
 # Max CPU time spent by a single task: 0s
 # Max CPU usage in a single interval: 0%
-# Overall CPU usage: 0%
+# Overall CPU usage: 0.00%
 # Max memory used by a single task: 0.00GB
 # Max network traffic in a single task: 0.00GB
 # Max network speed in a single interval: 0.00MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! container max CPU usage was 0% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 0 MiB -- try runtime_constraints "ram":0
index b61da154ffbdfd720ce2f69d1ee76e01cf2b687a..c64c34c80ec6cd775e81031330070c047265a96d 100644 (file)
@@ -23,5 +23,4 @@ time  elapsed 20      -       20
 # Max network speed in a single interval: 0.00MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
+#!! container max RSS was 67 MiB -- try reducing runtime_constraints to "ram":1020054732
index 9d3cd78d3f81f88239b1521c9a530b7898686e60..3075c24b951020d1444311bc083d269b581219b2 100644 (file)
@@ -34,5 +34,4 @@ time  elapsed 20      -       20
 # Max network speed in a single interval: 0.00MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
+#!! container max RSS was 67 MiB -- try reducing runtime_constraints to "ram":1020054732
index f0a60957bba706b18b6d41288f5fc46ded754b1c..5e3ad152f7e0e48759312592344cdc936eb95f23 100644 (file)
@@ -31,5 +31,4 @@ time  elapsed 80      -       80
 # Max network speed in a single interval: 42.58MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-jq0ekny1xou3zoh max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
index f9a34cfb98c7c6d42a633166b946b7c49db77cc1..e260ca5bdeeed232ee61e094c17fe1ccfad5063f 100644 (file)
@@ -20,5 +20,4 @@ time  elapsed 2       -       4
 # Max network speed in a single interval: 0.00MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-zvb2ocfycpomrup max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
index c54102d78a25d1158ba563a22de7279b3d39c2f2..ffe1072250123f2b05f67ddd62da2bf0881b35a1 100644 (file)
@@ -20,5 +20,4 @@ time  elapsed 2       -       3
 # Max network speed in a single interval: 0.00MB/s
 # Keep cache miss rate 0.00%
 # Keep cache utilization 0.00%
-#!! 4xphq-8i9sb-v831jm2uq0g2g9x max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
-#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try reducing runtime_constraints to "min_ram_mb_per_node":972
index af92becd80a6875d64e1d406d2b21f8bfbd6ec57..7603ea488c37e4a74f7946d3e108116329b0bad2 100644 (file)
@@ -8,20 +8,25 @@ import crunchstat_summary.command
 import difflib
 import glob
 import gzip
+from io import open
 import mock
 import os
+import sys
 import unittest
 
+from crunchstat_summary.command import UTF8Decode
+
 TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
 
 
 class ReportDiff(unittest.TestCase):
     def diff_known_report(self, logfile, cmd):
         expectfile = logfile+'.report'
-        expect = open(expectfile).readlines()
+        with open(expectfile, encoding='utf-8') as f:
+            expect = f.readlines()
         self.diff_report(cmd, expect, expectfile=expectfile)
 
-    def diff_report(self, cmd, expect, expectfile=None):
+    def diff_report(self, cmd, expect, expectfile='(expected)'):
         got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
         self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
             expect, got, fromfile=expectfile, tofile="(generated)")))
@@ -49,12 +54,15 @@ class HTMLFromFile(ReportDiff):
                 ['--format=html', '--log-file', logfile])
             cmd = crunchstat_summary.command.Command(args)
             cmd.run()
-            self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
+            if sys.version_info >= (3,2):
+                self.assertRegex(cmd.report(), r'(?is)<html>.*</html>\s*$')
+            else:
+                self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
 
 
 class SummarizeEdgeCases(unittest.TestCase):
     def test_error_messages(self):
-        logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'))
+        logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'), encoding='utf-8')
         s = crunchstat_summary.summarizer.Summarizer(logfile)
         s.run()
 
@@ -89,9 +97,9 @@ class SummarizeContainer(ReportDiff):
             'container.json', 'crunchstat.txt', 'arv-mount.txt']
         def _open(n):
             if n == "crunchstat.txt":
-                return gzip.open(self.logfile)
+                return UTF8Decode(gzip.open(self.logfile))
             elif n == "arv-mount.txt":
-                return gzip.open(self.arvmountlog)
+                return UTF8Decode(gzip.open(self.arvmountlog))
         mock_cr().open.side_effect = _open
         args = crunchstat_summary.command.ArgumentParser().parse_args(
             ['--job', self.fake_request['uuid']])
@@ -114,7 +122,7 @@ class SummarizeJob(ReportDiff):
     def test_job_report(self, mock_api, mock_cr):
         mock_api().jobs().get().execute.return_value = self.fake_job
         mock_cr().__iter__.return_value = ['fake-logfile.txt']
-        mock_cr().open.return_value = gzip.open(self.logfile)
+        mock_cr().open.return_value = UTF8Decode(gzip.open(self.logfile))
         args = crunchstat_summary.command.ArgumentParser().parse_args(
             ['--job', self.fake_job_uuid])
         cmd = crunchstat_summary.command.Command(args)
@@ -175,15 +183,14 @@ class SummarizePipeline(ReportDiff):
         mock_api().pipeline_instances().get().execute. \
             return_value = self.fake_instance
         mock_cr().__iter__.return_value = ['fake-logfile.txt']
-        mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+        mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
         args = crunchstat_summary.command.ArgumentParser().parse_args(
             ['--pipeline-instance', self.fake_instance['uuid']])
         cmd = crunchstat_summary.command.Command(args)
         cmd.run()
 
-        job_report = [
-            line for line in open(logfile+'.report').readlines()
-            if not line.startswith('#!! ')]
+        with open(logfile+'.report', encoding='utf-8') as f:
+            job_report = [line for line in f if not line.startswith('#!! ')]
         expect = (
             ['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
             job_report + ['\n'] +
@@ -251,15 +258,14 @@ class SummarizeACRJob(ReportDiff):
         mock_api().jobs().index().execute.return_value = self.fake_jobs_index
         mock_api().jobs().get().execute.return_value = self.fake_job
         mock_cr().__iter__.return_value = ['fake-logfile.txt']
-        mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+        mock_cr().open.side_effect = [UTF8Decode(gzip.open(logfile)) for _ in range(3)]
         args = crunchstat_summary.command.ArgumentParser().parse_args(
             ['--job', self.fake_job['uuid']])
         cmd = crunchstat_summary.command.Command(args)
         cmd.run()
 
-        job_report = [
-            line for line in open(logfile+'.report').readlines()
-            if not line.startswith('#!! ')]
+        with open(logfile+'.report', encoding='utf-8') as f:
+            job_report = [line for line in f if not line.startswith('#!! ')]
         expect = (
             ['### Summary for zzzzz-8i9sb-i3e77t9z5y8j9cc (partial) (zzzzz-8i9sb-i3e77t9z5y8j9cc)\n',
              '(no report generated)\n',