Merge branch 'master' into 13937-keepstore-prometheus
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 6 Mar 2019 21:09:28 +0000 (18:09 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 6 Mar 2019 21:09:28 +0000 (18:09 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

94 files changed:
.licenseignore
apps/workbench/.gitignore
apps/workbench/app/views/layouts/body.html.erb
apps/workbench/config/application.default.yml
apps/workbench/config/initializers/validate_wb2_url_config.rb [new file with mode: 0644]
apps/workbench/lib/config_validators.rb [new file with mode: 0644]
apps/workbench/package-build.version [deleted file]
apps/workbench/test/integration/application_layout_test.rb
apps/workbench/test/the.patch [deleted file]
build/package-build-dockerfiles/centos7/Dockerfile
build/package-build-dockerfiles/debian8/Dockerfile
build/package-build-dockerfiles/debian9/Dockerfile
build/package-build-dockerfiles/ubuntu1404/Dockerfile
build/package-build-dockerfiles/ubuntu1604/Dockerfile
build/package-build-dockerfiles/ubuntu1804/Dockerfile
build/run-build-docker-jobs-image.sh
build/run-build-packages-python-and-ruby.sh
build/run-build-packages.sh
build/run-library.sh
build/run-tests.sh
cmd/arvados-server/arvados-dispatch-cloud.service
doc/_includes/_install_compute_docker.liquid
doc/admin/upgrading.html.textile.liquid
doc/install/install-manual-prerequisites.html.textile.liquid
doc/user/topics/arv-docker.html.textile.liquid
docker/jobs/1078ECD7.key [new file with mode: 0644]
docker/jobs/Dockerfile
docker/jobs/apt.arvados.org-dev.list [new file with mode: 0644]
docker/jobs/apt.arvados.org-stable.list [new file with mode: 0644]
docker/jobs/apt.arvados.org-testing.list [new file with mode: 0644]
docker/jobs/apt.arvados.org.list [deleted file]
lib/cloud/azure/azure.go
lib/cloud/azure/azure_test.go
lib/cloud/interfaces.go
lib/controller/cmd.go
lib/controller/federation_test.go
lib/controller/handler_test.go
lib/controller/server_test.go
lib/dispatchcloud/cmd.go
lib/dispatchcloud/container/queue.go
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/node_size.go
lib/dispatchcloud/scheduler/fix_stale_locks.go
lib/dispatchcloud/scheduler/run_queue.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/scheduler.go
lib/dispatchcloud/scheduler/sync.go
lib/dispatchcloud/ssh_executor/executor.go
lib/dispatchcloud/ssh_executor/executor_test.go
lib/dispatchcloud/test/logger.go [deleted file]
lib/dispatchcloud/test/ssh_service.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/pool_test.go
lib/dispatchcloud/worker/verify.go [new file with mode: 0644]
lib/dispatchcloud/worker/worker.go
lib/dispatchcloud/worker/worker_test.go
lib/service/cmd.go
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/test_container.py
sdk/go/arvados/config.go
sdk/go/ctxlog/log.go
sdk/java/src/main/java/org/arvados/sdk/Arvados.java
sdk/python/arvados/_normalize_stream.py
sdk/python/arvados/collection.py
sdk/python/arvados/commands/keepdocker.py
sdk/python/arvados/commands/put.py
sdk/python/setup.py
sdk/python/tests/test_arv_put.py
sdk/ruby/lib/arvados/collection.rb
sdk/ruby/test/test_collection.rb
services/api/.gitignore
services/api/app/models/container.rb
services/api/test/unit/container_test.rb
services/crunch-run/background.go
services/crunch-run/crunchrun.go
services/fuse/arvados_fuse/fresh.py
services/fuse/setup.py
services/keepstore/azure_blob_volume.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/keepstore_test.go
services/keepstore/volume_test.go
services/ws/session_v0.go
tools/arvbox/lib/arvbox/docker/service/workbench/run-service

index 06519a98e8bc45afcebdad584198a6b6bb47bf71..45028bf888ff6a40f910f29197aaac1a8d29516f 100644 (file)
@@ -15,7 +15,8 @@ build/package-test-dockerfiles/ubuntu1604/etc-apt-preferences.d-arvados
 doc/fonts/*
 doc/user/cwl/federated/*
 */docker_image
-docker/jobs/apt.arvados.org.list
+docker/jobs/apt.arvados.org*.list
+docker/jobs/1078ECD7.key
 */en.bootstrap.yml
 *font-awesome.css
 *.gif
index 156fc86a5eadee7b9cef56c004db808c8a3d8d03..25c7c3ef24ea04acc225e1d1295d424d3959a0cc 100644 (file)
@@ -44,3 +44,6 @@
 # npm-rails
 /node_modules
 /npm-debug.log
+
+# Generated when building distribution packages
+/package-build.version
index b017b4a29ae2bbd35877301f5a6f021555eb6f11..b2cd097f3174c03fa49c7d1f280d7c809ecf280c 100644 (file)
@@ -82,6 +82,21 @@ SPDX-License-Identifier: AGPL-3.0 %>
                      </form>
                     </li>
                   <% end %>
+                <% if Rails.configuration.workbench2_url %>
+                <li role="menuitem">
+                  <%
+                    wb2_url = Rails.configuration.workbench2_url
+                    wb2_url += '/' if wb2_url[-1] != '/'
+                    wb2_url += 'token'
+                  %>
+                  <form action="<%= wb2_url %>" method="GET">
+                    <input type="hidden" name="api_token" value="<%= Thread.current[:arvados_api_token] %>">
+                    <button role="menuitem" type="submit">
+                      <i class="fa fa-lg fa-share-square fa-fw"></i> Go to Workbench 2
+                    </button>
+                  </form>
+                </li>
+                <% end %>
                 <li role="menuitem">
                   <%= link_to virtual_machines_user_path(current_user), role: 'menu-item' do %>
                     <i class="fa fa-lg fa-terminal fa-fw"></i> Virtual machines
index 4e0a35a5550360252cae77e49e22ac1d7dec370f..ccc7e4bbddaaf8c6396fe33c863b96f1bbf54235 100644 (file)
@@ -326,3 +326,11 @@ common:
   # the jobs api is disabled and there are no local git repositories.
   #
   repositories: true
+
+  #
+  # Add an item to the user menu pointing to workbench2_url, if not false.
+  #
+  # Example:
+  # workbench2_url: https://workbench2.qr1hi.arvadosapi.com
+  #
+  workbench2_url: false
diff --git a/apps/workbench/config/initializers/validate_wb2_url_config.rb b/apps/workbench/config/initializers/validate_wb2_url_config.rb
new file mode 100644 (file)
index 0000000..f909648
--- /dev/null
@@ -0,0 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+include ConfigValidators
+
+ConfigValidators::validate_wb2_url_config()
\ No newline at end of file
diff --git a/apps/workbench/lib/config_validators.rb b/apps/workbench/lib/config_validators.rb
new file mode 100644 (file)
index 0000000..ec76916
--- /dev/null
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require 'uri'
+
+module ConfigValidators
+    def validate_wb2_url_config
+        if Rails.configuration.workbench2_url
+            begin
+                if !URI.parse(Rails.configuration.workbench2_url).is_a?(URI::HTTP)
+                    Rails.logger.warn("workbench2_url config is not an HTTP URL: #{Rails.configuration.workbench2_url}")
+                    Rails.configuration.workbench2_url = false
+                elsif /.*[\/]{2,}$/.match(Rails.configuration.workbench2_url)
+                    Rails.logger.warn("workbench2_url config shouldn't have multiple trailing slashes: #{Rails.configuration.workbench2_url}")
+                    Rails.configuration.workbench2_url = false
+                else
+                    return true
+                end
+            rescue URI::InvalidURIError
+                Rails.logger.warn("workbench2_url config invalid URL: #{Rails.configuration.workbench2_url}")
+                Rails.configuration.workbench2_url = false
+            end
+        end
+        return false
+    end
+end
+
diff --git a/apps/workbench/package-build.version b/apps/workbench/package-build.version
deleted file mode 100644 (file)
index 41eb2c7..0000000
+++ /dev/null
@@ -1 +0,0 @@
-1.2.1.20181126194329
index 74a42877b1301f52b65e557b75bbac60165439f6..b3f704cdd98feb3be31326b2cbaf6451c5aa7925 100644 (file)
@@ -140,6 +140,30 @@ class ApplicationLayoutTest < ActionDispatch::IntegrationTest
     end
   end
 
+  [
+    [false, false],
+    ['http://wb2.example.org//', false],
+    ['ftp://wb2.example.org', false],
+    ['wb2.example.org', false],
+    ['http://wb2.example.org', true],
+    ['https://wb2.example.org', true],
+    ['http://wb2.example.org/', true],
+    ['https://wb2.example.org/', true],
+  ].each do |wb2_url_config, wb2_menu_appear|
+    test "workbench2_url=#{wb2_url_config} should#{wb2_menu_appear ? '' : ' not'} show WB2 menu" do
+      Rails.configuration.workbench2_url = wb2_url_config
+      assert_equal wb2_menu_appear, ConfigValidators::validate_wb2_url_config()
+
+      visit page_with_token('active')
+      within('.navbar-fixed-top') do
+        page.find("#notifications-menu").click
+        within('.dropdown-menu') do
+          assert_equal wb2_menu_appear, page.has_text?('Go to Workbench 2')
+        end
+      end
+    end
+  end
+
   [
     ['active', true],
     ['active_with_prefs_profile_no_getting_started_shown', false],
diff --git a/apps/workbench/test/the.patch b/apps/workbench/test/the.patch
deleted file mode 100644 (file)
index 5a55679..0000000
+++ /dev/null
@@ -1,3 +0,0 @@
-+    echo -n 'geckodriver: '
-+    which geckodriver || fatal "No geckodriver. Unable to find Mozilla geckodriver. Please download the server from https://github.com/mozilla/geckodriver/releases and place it somewhere on your PATH. More info at https://developer.mozilla.org/en-US/docs/Mozilla/QA/Marionette/WebDriver."
-
index 522189a20cd92625652a4ee38d9c2f02222135b4..ad6f4e1e8f5051c8bb90e449eb89873ca13ea107 100644 (file)
@@ -32,14 +32,14 @@ RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
 RUN touch /var/lib/rpm/* && yum -q -y install rh-python35
 RUN scl enable rh-python35 "easy_install-3.5 pip" && easy_install-2.7 pip
 
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
 # Add epel, we need it for the python-pam dependency
 RUN wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
 RUN rpm -ivh epel-release-latest-7.noarch.rpm
 
 RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
 
+# The version of setuptools that comes with CentOS is way too old
+RUN pip install --upgrade setuptools
+
 ENV WORKSPACE /arvados
 CMD ["scl", "enable", "rh-python35", "/usr/local/rvm/bin/rvm-exec default bash /jenkins/run-build-packages.sh --target centos7"]
index 1e6c62ce7ea72fba1380260bb2c7c07dbc19c87c..3f591cdfa14aceab1ff1b4be1c650192d396131f 100644 (file)
@@ -8,7 +8,7 @@ MAINTAINER Ward Vandewege <ward@curoverse.com>
 ENV DEBIAN_FRONTEND noninteractive
 
 # Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
 
 # Install virtualenv
 RUN /usr/bin/pip install virtualenv
@@ -32,9 +32,6 @@ RUN ln -s /usr/local/go/bin/go /usr/local/bin/
 ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
 RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
 
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
 RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
 
 ENV WORKSPACE /arvados
index fcaf0bc0a1bb74cb134847eed3cbde9fc14f0c80..6f7f3faafe8c4e18f580be1f8e341a978cf38c84 100644 (file)
@@ -9,7 +9,7 @@ MAINTAINER Nico Cesar <nico@curoverse.com>
 ENV DEBIAN_FRONTEND noninteractive
 
 # Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
 
 # Install virtualenv
 RUN /usr/bin/pip install virtualenv
@@ -33,9 +33,6 @@ RUN ln -s /usr/local/go/bin/go /usr/local/bin/
 ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
 RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
 
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
 RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
 
 ENV WORKSPACE /arvados
index a88435a5b4bb0e98fd646f7ede5e03a45c5490bd..4c01c9e8180fefb268ff80e9d930f339144eb879 100644 (file)
@@ -8,7 +8,7 @@ MAINTAINER Ward Vandewege <ward@curoverse.com>
 ENV DEBIAN_FRONTEND noninteractive
 
 # Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip python3.4-venv python3.4-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip python3.4-venv python3.4-dev
 
 # Install virtualenv
 RUN /usr/bin/pip install virtualenv
@@ -32,9 +32,6 @@ RUN ln -s /usr/local/go/bin/go /usr/local/bin/
 ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
 RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
 
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
 RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
 
 ENV WORKSPACE /arvados
index be3911eff3e94ee6907efc883a8da047a4622ae4..a83fc77132315651dfd432877c7fecb5b4090f4e 100644 (file)
@@ -8,7 +8,7 @@ MAINTAINER Ward Vandewege <ward@curoverse.com>
 ENV DEBIAN_FRONTEND noninteractive
 
 # Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev libgnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip tzdata python3-venv python3-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev libgnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip tzdata python3-venv python3-dev
 
 # Install virtualenv
 RUN /usr/bin/pip install virtualenv
@@ -32,9 +32,6 @@ RUN ln -s /usr/local/go/bin/go /usr/local/bin/
 ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
 RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
 
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
 RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
 
 ENV WORKSPACE /arvados
index 8995d14be0bdc55cbdeb47c005c2323f648b6bc4..d0a099911ce91ac9c0e7892d75072310c91e710e 100644 (file)
@@ -32,9 +32,6 @@ RUN ln -s /usr/local/go/bin/go /usr/local/bin/
 ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
 RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
 
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
 RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
 
 ENV WORKSPACE /arvados
index 7186a2209129a08c9c6fbd6a094ce6f0a9dac3c0..7d7e1fc8abf9171df1d905c22478eabf23236299 100755 (executable)
@@ -5,21 +5,24 @@
 
 function usage {
     echo >&2
-    echo >&2 "usage: $0 [options]"
+    echo >&2 "usage: WORKSPACE=/path/to/arvados $0 [options]"
     echo >&2
     echo >&2 "$0 options:"
     echo >&2 "  -t, --tags                    version tag for docker"
+    echo >&2 "  -r, --repo                    Arvados package repot to use: dev, testing, stable (default: dev)"
     echo >&2 "  -u, --upload                  Upload the images (docker push)"
     echo >&2 "  --no-cache                    Don't use build cache"
     echo >&2 "  -h, --help                    Display this help and exit"
     echo >&2
-    echo >&2 "  If no options are given, just builds the images."
+    echo >&2 "  WORKSPACE=path                Path to the Arvados source tree to build from"
+    echo >&2
 }
 upload=false
+REPO=dev
 
 # NOTE: This requires GNU getopt (part of the util-linux package on Debian-based distros).
-TEMP=`getopt -o hut: \
-    --long help,upload,no-cache,tags: \
+TEMP=`getopt -o hut:r: \
+    --long help,upload,no-cache,tags,repo: \
     -n "$0" -- "$@"`
 
 if [ $? != 0 ] ; then echo "Use -h for help"; exit 1 ; fi
@@ -50,6 +53,19 @@ do
                   ;;
             esac
             ;;
+        -r | --repo)
+            case "$2" in
+                "")
+                  echo "ERROR: --repo needs a parameter";
+                  usage;
+                  exit 1
+                  ;;
+                *)
+                  REPO="$2";
+                  shift 2
+                  ;;
+            esac
+            ;;
         --)
             shift
             break
@@ -69,6 +85,16 @@ exit_cleanly() {
     exit $EXITCODE
 }
 
+# Sanity check
+if ! [[ -n "$WORKSPACE" ]]; then
+    usage;
+    echo >&2 "Error: WORKSPACE environment variable not set"
+    echo >&2
+    exit 1
+fi
+
+echo $WORKSPACE
+
 COLUMNS=80
 . $WORKSPACE/build/run-library.sh
 
@@ -88,16 +114,6 @@ docker_push () {
     checkexit $ECODE "docker push $*"
 }
 
-# Sanity check
-if ! [[ -n "$WORKSPACE" ]]; then
-    echo >&2
-    echo >&2 "Error: WORKSPACE environment variable not set"
-    echo >&2
-    exit 1
-fi
-
-echo $WORKSPACE
-
 # find the docker binary
 DOCKER=`which docker.io`
 
@@ -153,6 +169,7 @@ cd docker/jobs
 docker build $NOCACHE \
        --build-arg python_sdk_version=${python_sdk_version} \
        --build-arg cwl_runner_version=${cwl_runner_version} \
+       --build-arg repo_version=${REPO} \
        -t arvados/jobs:$cwl_runner_version_orig .
 
 ECODE=$?
@@ -175,6 +192,9 @@ if docker --version |grep " 1\.[0-9]\." ; then
     # -f flag removed in Docker 1.12
     FORCE=-f
 fi
+
+#docker export arvados/jobs:$cwl_runner_version_orig | docker import - arvados/jobs:$cwl_runner_version_orig
+
 if ! [[ -z "$version_tag" ]]; then
     docker tag $FORCE arvados/jobs:$cwl_runner_version_orig arvados/jobs:"$version_tag"
 else
index 35f8104450339c80c4f9e2ff92f50cd200f2b1cc..4c5f39a373e66cdf160ac71aaa7edf7fc47cd2e3 100755 (executable)
@@ -118,9 +118,6 @@ if [[ "$DEBUG" != 0 ]]; then
     DASHQ_UNLESS_DEBUG=
 fi
 
-EASY_INSTALL2=$(find_easy_install -$PYTHON2_VERSION "")
-EASY_INSTALL3=$(find_easy_install -$PYTHON3_VERSION 3)
-
 RUN_BUILD_PACKAGES_PATH="`dirname \"$0\"`"
 RUN_BUILD_PACKAGES_PATH="`( cd \"$RUN_BUILD_PACKAGES_PATH\" && pwd )`"  # absolutized and normalized
 if [ -z "$RUN_BUILD_PACKAGES_PATH" ] ; then
index ef1df03d5a97498e0ba15b02dd16cb012a8ba3c8..6264e93f0f1e9b3d2d6634de35a76a0a55fd2588 100755 (executable)
@@ -248,21 +248,6 @@ handle_ruby_gem arvados-login-sync
 # Python packages
 debug_echo -e "\nPython packages\n"
 
-cd "$WORKSPACE/sdk/pam"
-handle_python_package
-
-cd "$WORKSPACE/sdk/python"
-handle_python_package
-
-cd "$WORKSPACE/sdk/cwl"
-handle_python_package
-
-cd "$WORKSPACE/services/fuse"
-handle_python_package
-
-cd "$WORKSPACE/services/nodemanager"
-handle_python_package
-
 # arvados-src
 (
     cd "$WORKSPACE"
index e6b086a5a3b5c2965a4f108e1c3c6a58dc63d2ab..40589fd565c258240fed5fe1057fad5ab38993b1 100755 (executable)
@@ -274,7 +274,7 @@ test_package_presence() {
         echo ${repo_pkg_list} |grep -q ${complete_pkgname}
         if [ $? -eq 0 ] ; then
           echo "Package $complete_pkgname exists, not rebuilding!"
-          curl -o ./${complete_pkgname} http://apt.arvados.org/pool/${D}/main/${repo_subdir}/${complete_pkgname}
+          curl -s -o ./${complete_pkgname} http://apt.arvados.org/pool/${D}/main/${repo_subdir}/${complete_pkgname}
           return 1
         elif test -f "$WORKSPACE/packages/$TARGET/processed/${complete_pkgname}" ; then
           echo "Package $complete_pkgname exists, not rebuilding!"
@@ -287,11 +287,11 @@ test_package_presence() {
     else
       centos_repo="http://rpm.arvados.org/CentOS/7/dev/x86_64/"
 
-      repo_pkg_list=$(curl -o - ${centos_repo})
+      repo_pkg_list=$(curl -s -o - ${centos_repo})
       echo ${repo_pkg_list} |grep -q ${complete_pkgname}
       if [ $? -eq 0 ]; then
         echo "Package $complete_pkgname exists, not rebuilding!"
-        curl -o ./${complete_pkgname} ${centos_repo}${complete_pkgname}
+        curl -s -o ./${complete_pkgname} ${centos_repo}${complete_pkgname}
         return 1
       elif test -f "$WORKSPACE/packages/$TARGET/processed/${complete_pkgname}" ; then
         echo "Package $complete_pkgname exists, not rebuilding!"
@@ -381,16 +381,23 @@ fpm_build_virtualenv () {
 
   local python=""
   case "$PACKAGE_TYPE" in
+    python3)
+        python=python3
+        if [[ "$FORMAT" != "rpm" ]]; then
+          pip=pip3
+        else
+          # In CentOS, we use a different mechanism to get the right version of pip
+          pip=pip
+        fi
+        PACKAGE_PREFIX=$PYTHON3_PKG_PREFIX
+        ;;
     python)
         # All Arvados Python2 packages depend on Python 2.7.
         # Make sure we build with that for consistency.
         python=python2.7
+        pip=pip
         PACKAGE_PREFIX=$PYTHON2_PKG_PREFIX
         ;;
-    python3)
-        PACKAGE_PREFIX=$PYTHON3_PKG_PREFIX
-        python=python3
-        ;;
   esac
 
   if [[ "$PKG" != "libpam-arvados" ]] &&
@@ -410,8 +417,14 @@ fpm_build_virtualenv () {
 
   rm -rf dist/*
 
+  # Get the latest setuptools
+  if ! $pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools; then
+    echo "Error, unable to upgrade setuptools with"
+    echo "  $pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools"
+    exit 1
+  fi
   if ! $python setup.py $DASHQ_UNLESS_DEBUG sdist; then
-    echo "Error, unable to run python setup.py sdist for $PKG"
+    echo "Error, unable to run $python setup.py sdist for $PKG"
     exit 1
   fi
 
@@ -446,28 +459,33 @@ fpm_build_virtualenv () {
     exit 1
   fi
 
-  if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip; then
+  if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip; then
     echo "Error, unable to upgrade pip with"
-    echo "  build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip"
+    echo "  build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip"
+    exit 1
+  fi
+  if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools; then
+    echo "Error, unable to upgrade setuptools with"
+    echo "  build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools"
     exit 1
   fi
-  if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel; then
+  if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel; then
     echo "Error, unable to upgrade wheel with"
-    echo "  build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel"
+    echo "  build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel"
     exit 1
   fi
 
   if [[ "$TARGET" != "centos7" ]] || [[ "$PYTHON_PKG" != "python-arvados-fuse" ]]; then
-    build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
+    build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
   else
     # centos7 needs these special tweaks to install python-arvados-fuse
-    build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG docutils
-    PYCURL_SSL_LIBRARY=nss build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
+    build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG docutils
+    PYCURL_SSL_LIBRARY=nss build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
   fi
 
   if [[ "$?" != "0" ]]; then
     echo "Error, unable to run"
-    echo "  build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH"
+    echo "  build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH"
     exit 1
   fi
 
index 9919c3e1751766892b72bcc806170b61f43e5999..caaca1f31e51677c3881dbf82ea9197ff53660c2 100755 (executable)
@@ -537,7 +537,6 @@ export GOPATH
 (
     set -e
     mkdir -p "$GOPATH/src/git.curoverse.com"
-    rmdir -v --parents --ignore-fail-on-non-empty "${temp}/GOPATH"
     if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
         for d in \
             "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
index 5ea5d45e79c4940ee2ef83230988f4641c57c2b8..aa5cc3b4a5d033c3c163b09ad9a4ad411c3237b4 100644 (file)
@@ -3,7 +3,7 @@
 # SPDX-License-Identifier: AGPL-3.0
 
 [Unit]
-Description=Arvados cloud dispatch
+Description=arvados-dispatch-cloud
 Documentation=https://doc.arvados.org/
 After=network.target
 AssertPathExists=/etc/arvados/config.yml
index 06db793314931b8200651ba24db3df37fd1730f8..ea3640e52a077ba0d5ce626740af691f701f4439 100644 (file)
@@ -73,7 +73,7 @@ h2. Download and tag the latest arvados/jobs docker image
 In order to start workflows from workbench, there needs to be Docker image tagged @arvados/jobs:latest@. The following command downloads the latest arvados/jobs image from Docker Hub, loads it into Keep, and tags it as 'latest'.  In this example @$project_uuid@ should be the the UUID of the "Arvados Standard Docker Images" project.
 
 <notextile>
-<pre><code>~$ <span class="userinput">arv-keepdocker --project-uuid $project_uuid --pull arvados/jobs latest</span>
+<pre><code>~$ <span class="userinput">arv-keepdocker --pull arvados/jobs latest --project-uuid $project_uuid</span>
 </code></pre></notextile>
 
 If the image needs to be downloaded from Docker Hub, the command can take a few minutes to complete, depending on available network bandwidth.
index eda25d5cc6678524ea1cd44b0500201f49e56e65..6e2e6cba6dfeb1873d8a58049e1cdb65ac801bdd 100644 (file)
@@ -32,6 +32,63 @@ TODO: extract this information based on git commit messages and generate changel
 
 h3. current master branch
 
+h4. Stricter collection manifest validation on the API server
+
+As a consequence of "#14482":https://dev.arvados.org/issues/14482, the Ruby SDK does a more rigorous collection manifest validation. Collections created after 2015-05 are unlikely to be invalid, however you may check for invalid manifests using the script below.
+
+You could set up a new rvm gemset and install the specific arvados gem for testing, like so:
+
+<notextile>
+<pre><code>~$ <span class="userinput">rvm gemset create rubysdk-test</span>
+~$ <span class="userinput">rvm gemset use rubysdk-test</span>
+~$ <span class="userinput">gem install arvados -v 1.3.1.20190301212059</span>
+</code></pre>
+</notextile>
+
+Next, you can run the following script using admin credentials, it will scan the whole collection database and report any collection that didn't pass the check:
+
+{% codeblock as ruby %}
+require 'arvados'
+require 'arvados/keep'
+
+api = Arvados.new
+offset = 0
+batch_size = 100
+invalid = []
+
+while true
+    begin
+        req = api.collection.index(
+            :select => [:uuid, :created_at, :manifest_text],
+            :include_trash => true, :include_old_versions => true,
+            :limit => batch_size, :offset => offset)
+    rescue
+        invalid.each {|c| puts "#{c[:uuid]} (Created at #{c[:created_at]}): #{c[:error]}" }
+        raise
+    end
+
+    req[:items].each do |col|
+        begin
+            Keep::Manifest.validate! col[:manifest_text]
+        rescue Exception => e
+            puts "Collection #{col[:uuid]} manifest not valid"
+            invalid << {uuid: col[:uuid], error: e, created_at: col[:created_at]}
+        end
+    end
+    puts "Checked #{offset} / #{req[:items_available]} - Invalid: #{invalid.size}"
+    offset += req[:limit]
+    break if offset > req[:items_available]
+end
+
+if invalid.empty?
+    puts "No invalid collection manifests found"
+else
+    invalid.each {|c| puts "#{c[:uuid]} (Created at #{c[:created_at]}): #{c[:error]}" }
+end
+{% endcodeblock %}
+
+The script will return a final report enumerating any invalid collection by UUID, with its creation date and error message so you can take the proper correction measures, if needed.
+
 h4. Python packaging change
 
 As part of story "#9945":https://dev.arvados.org/issues/9945, the distribution packaging (deb/rpm) of our Python packages has changed. These packages now include a built-in virtualenv to reduce dependencies on system packages. We have also stopped packaging and publishing backports for all the Python dependencies of our packages, as they are no longer needed.
index e0cc4b8581e65a1a38292f1953418db394f92bee..62017163d2b48540290ff3df29733eebfa759735 100644 (file)
@@ -39,6 +39,7 @@ table(table table-bordered table-condensed).
 |Debian 9 ("stretch")|Supported|Latest|
 |Ubuntu 14.04 ("trusty")|Supported|Latest|
 |Ubuntu 16.04 ("xenial")|Supported|Latest|
+|Ubuntu 18.04 ("bionic")|Supported|Latest|
 |Ubuntu 12.04 ("precise")|EOL|8ed7b6dd5d4df93a3f37096afe6d6f81c2a7ef6e (2017-05-03)|
 |Debian 7 ("wheezy")|EOL|997479d1408139e96ecdb42a60b4f727f814f6c9 (2016-12-28)|
 |CentOS 6 |EOL|997479d1408139e96ecdb42a60b4f727f814f6c9 (2016-12-28)|
@@ -64,7 +65,7 @@ gpgkey=http://rpm.arvados.org/CentOS/RPM-GPG-KEY-curoverse
 
 h3. Debian and Ubuntu
 
-Packages are available for Debian 8 ("jessie"), Debian 9 ("stretch"), Ubuntu 14.04 ("trusty") and Ubuntu 16.04 ("xenial").
+Packages are available for Debian 8 ("jessie"), Debian 9 ("stretch"), Ubuntu 14.04 ("trusty"), Ubuntu 16.04 ("xenial") and Ubuntu 18.04 ("bionic").
 
 First, register the Curoverse signing key in apt's database:
 
@@ -78,6 +79,7 @@ table(table table-bordered table-condensed).
 |Debian 9 ("stretch")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ stretch main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
 |Ubuntu 14.04 ("trusty")[1]|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ trusty main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
 |Ubuntu 16.04 ("xenial")[1]|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ xenial main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
+|Ubuntu 18.04 ("bionic")[1]|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ bionic main" &#x7c; sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
 
 {% include 'notebox_begin' %}
 
index c21fbd9ad2204c0eb056f473879c057ebbc814a7..f34c21a9d741042610e81891ea37848d12380506 100644 (file)
@@ -210,6 +210,6 @@ h2. Share Docker images
 Docker images are subject to normal Arvados permissions.  If wish to share your Docker image with others (or wish to share a pipeline template that uses your Docker image) you will need to use @arv-keepdocker@ with the @--project-uuid@ option to upload the image to a shared project.
 
 <notextile>
-<pre><code>$ <span class="userinput">arv-keepdocker --project-uuid qr1hi-j7d0g-xxxxxxxxxxxxxxx arvados/jobs-with-r</span>
+<pre><code>$ <span class="userinput">arv-keepdocker arvados/jobs-with-r --project-uuid qr1hi-j7d0g-xxxxxxxxxxxxxxx</span>
 </code></pre>
 </notextile>
diff --git a/docker/jobs/1078ECD7.key b/docker/jobs/1078ECD7.key
new file mode 100644 (file)
index 0000000..edc62f4
--- /dev/null
@@ -0,0 +1,30 @@
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+
+mQENBEzhgeoBCAChhoK1dqpWzNyDWqRGEvdFdkJaA9D2HRwKPfBfjAoePX6ZyrpA
+ItlUsvt/8s/DRiTiPEFQR4S7VqocmU6whJc3gDEGyOM6b1NF873lIfSVwUoE42QE
+a76dO8woOYgLUyxu2mKG+bJgGMumjBJt6ZOndYVjTYB/7sEeVxwmMVulfZe0s6zg
+ut0+SoTYg2R36qIqeIcWllYt97sEYnyy1qXMis4/3IZnuWkS/frsPR3aeUI4W+o2
+NDN1kj49+LMe7Fb5b7jZY08rZbAWXi1rU1hQx4jC9RvYqlT4HNld4Bn7os1IvOOA
+wNiR0oiVdiuDbBxcMvRPktxMrFVjowusRLq/ABEBAAG0PUN1cm92ZXJzZSwgSW5j
+IEF1dG9tYXRpYyBTaWduaW5nIEtleSA8c3lzYWRtaW5AY3Vyb3ZlcnNlLmNvbT6J
+ATgEEwECACIFAlNgYIECGwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEFcW
+WREQeOzXPkEH/jQJDIYI1dxWcYiA+hczmpaZvN2/pc/kwIW/6a03+6zqmSNkebOE
+TgoDILacSYc17hy20R1/rWyUstOMKcEgFDBlSehhHyl0f7q/w7d8Ais6MabzsPfx
+IceJpsjUg87+BR7qWhgQ0sxmtIF2TKuTFLs+nkGsgSsiBOEF4NvHxuj3HD4y8F27
+HNqrkqwjLS8xJwwH5Gp2uMEVr1AXIH3iSRjJ8X124s8iEP97Q/3IazoYRf9/MCSm
+QEx8KzxwDX6t4bW6O4D01K+e9gdkTY70dcMgJoqm5IsX7yxjEubiOunphtlJnZ9d
+Oi1yBN5UM3pWKAdcfRj4rcfV9Simvpx9av+5AQ0ETOGB6gEIAMAA0HVMG0BbdnU7
+wWgl5eFdT0AUSrXK/WdcKqVEGGv+c68NETSHWZOJX7O46Eao4gY4cTYprVMBzxpY
+/BtQSYLpE0HLvBc1fcFd61Yz4H/9rGSNY0GcIQEbOjbJY5mr8qFsQ1K/mAf3aUL3
+b6ni4sHVicRiRr0Gl4Ihorlskpfu1SHs/C5tvTSVNF9p4vtl5892y1yILQeVpcBs
+NCR7MUpdS49xCpvnAWsDZX+ij6LTR3lzCm/ZLCg4gNuZkjgU9oqVfGkqysW7WZ8S
+OLvzAwUw7i1EIFX8q6QdudGoezxz8m8OgZM1v8AFpYEKlhEPf1W0MSfaRDwrj866
+8nCLruEAEQEAAYkBHwQYAQIACQUCTOGB6gIbDAAKCRBXFlkREHjs199EB/4+p0G1
+3PHxt6rLWSCGXobDOu4ZOA/qnv0D/JhOLroFds5TzQv6vnS8eAkhCTjHVA+b58cm
+kXpI0oYcD4ZP+KK1CHKq2rGfwou7HfAF+icnNqYkeBOkjjbCgkvBlcCInuAuU8JX
+DZMkfFk52+eBKwTjS/J/fQp0vDru8bHLp98WgdRHWfJQ3mc3gz4A5sR6zhrGPW6/
+ssnROS4dC2Ohp35GpgN1KjD3EmEw5RoSBYlyrARCaMsivgIKMxGUEyFZWhuJt3N1
+2MTddRwz28hbmYCi+MzHYDbRv+cSyUDmvXaWhfkNKBepClBA1rTWBcldit5vvlqr
+yPet6wIKrtLGhAqZ
+=CLkG
+-----END PGP PUBLIC KEY BLOCK-----
index c0fe145db1b292ebc55c536b9d0e7786a7b06daa..02a1c3829d432e284a770d11459aaa111bff57db 100644 (file)
@@ -2,29 +2,33 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
-# Based on Debian Jessie
-FROM debian:jessie
-MAINTAINER Ward Vandewege <ward@curoverse.com>
+# Based on Debian Stretch
+FROM debian:stretch
+MAINTAINER Ward Vandewege <wvandewege@veritasgenetics.com>
 
 ENV DEBIAN_FRONTEND noninteractive
 
-ADD apt.arvados.org.list /etc/apt/sources.list.d/
-RUN apt-key adv --keyserver pool.sks-keyservers.net --recv 1078ECD7
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3
+RUN apt-get update -q
+RUN apt-get install -yq --no-install-recommends gnupg
+
+ARG repo_version
+RUN echo repo_version $repo_version
+ADD apt.arvados.org-$repo_version.list /etc/apt/sources.list.d/
+
+ADD 1078ECD7.key /tmp/
+RUN cat /tmp/1078ECD7.key | apt-key add -
 
 ARG python_sdk_version
 ARG cwl_runner_version
 RUN echo cwl_runner_version $cwl_runner_version python_sdk_version $python_sdk_version
 
 RUN apt-get update -q
-RUN apt-get install -yq --no-install-recommends \
-    git python-pip python-virtualenv \
-    python-dev libgnutls28-dev libcurl4-gnutls-dev nodejs \
+RUN apt-get install -yq --no-install-recommends nodejs \
     python-arvados-python-client=$python_sdk_version \
     python-arvados-cwl-runner=$cwl_runner_version
 
-# use the Python executable from the python-arvados-python-client package
-RUN rm -f /usr/bin/python && ln -s /usr/share/python2.7/dist/python-arvados-python-client/bin/python /usr/bin/python
+# use the Python executable from the python-arvados-cwl-runner package
+RUN rm -f /usr/bin/python && ln -s /usr/share/python2.7/dist/python-arvados-cwl-runner/bin/python /usr/bin/python
 
 # Install dependencies and set up system.
 RUN /usr/sbin/adduser --disabled-password \
diff --git a/docker/jobs/apt.arvados.org-dev.list b/docker/jobs/apt.arvados.org-dev.list
new file mode 100644 (file)
index 0000000..468000e
--- /dev/null
@@ -0,0 +1,2 @@
+# apt.arvados.org
+deb http://apt.arvados.org/ stretch-dev main
diff --git a/docker/jobs/apt.arvados.org-stable.list b/docker/jobs/apt.arvados.org-stable.list
new file mode 100644 (file)
index 0000000..afbc51e
--- /dev/null
@@ -0,0 +1,2 @@
+# apt.arvados.org
+deb http://apt.arvados.org/ stretch main
diff --git a/docker/jobs/apt.arvados.org-testing.list b/docker/jobs/apt.arvados.org-testing.list
new file mode 100644 (file)
index 0000000..c8ea91d
--- /dev/null
@@ -0,0 +1,2 @@
+# apt.arvados.org
+deb http://apt.arvados.org/ stretch-testing main
diff --git a/docker/jobs/apt.arvados.org.list b/docker/jobs/apt.arvados.org.list
deleted file mode 100644 (file)
index 11b98e2..0000000
+++ /dev/null
@@ -1,2 +0,0 @@
-# apt.arvados.org
-deb http://apt.arvados.org/ jessie-dev main
index d745e7e54d27473147e3f214a213a4514a596131..d19e4bef2372ff87dfb13e26db1ffddb9726bdca 100644 (file)
@@ -47,6 +47,14 @@ type azureInstanceSetConfig struct {
        StorageAccount               string
        BlobContainer                string
        DeleteDanglingResourcesAfter arvados.Duration
+       AdminUsername                string
+}
+
+const tagKeyInstanceSecret = "InstanceSecret"
+
+type containerWrapper interface {
+       GetBlobReference(name string) *storage.Blob
+       ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
 }
 
 type virtualMachinesClientWrapper interface {
@@ -189,20 +197,20 @@ func wrapAzureError(err error) error {
 }
 
 type azureInstanceSet struct {
-       azconfig          azureInstanceSetConfig
-       vmClient          virtualMachinesClientWrapper
-       netClient         interfacesClientWrapper
-       storageAcctClient storageacct.AccountsClient
-       azureEnv          azure.Environment
-       interfaces        map[string]network.Interface
-       dispatcherID      string
-       namePrefix        string
-       ctx               context.Context
-       stopFunc          context.CancelFunc
-       stopWg            sync.WaitGroup
-       deleteNIC         chan string
-       deleteBlob        chan storage.Blob
-       logger            logrus.FieldLogger
+       azconfig     azureInstanceSetConfig
+       vmClient     virtualMachinesClientWrapper
+       netClient    interfacesClientWrapper
+       blobcont     containerWrapper
+       azureEnv     azure.Environment
+       interfaces   map[string]network.Interface
+       dispatcherID string
+       namePrefix   string
+       ctx          context.Context
+       stopFunc     context.CancelFunc
+       stopWg       sync.WaitGroup
+       deleteNIC    chan string
+       deleteBlob   chan storage.Blob
+       logger       logrus.FieldLogger
 }
 
 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
@@ -212,12 +220,14 @@ func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetI
                return nil, err
        }
 
-       ap := azureInstanceSet{logger: logger}
-       err = ap.setup(azcfg, string(dispatcherID))
+       az := azureInstanceSet{logger: logger}
+       az.ctx, az.stopFunc = context.WithCancel(context.Background())
+       err = az.setup(azcfg, string(dispatcherID))
        if err != nil {
+               az.stopFunc()
                return nil, err
        }
-       return &ap, nil
+       return &az, nil
 }
 
 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
@@ -248,12 +258,26 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
 
        az.vmClient = &virtualMachinesClientImpl{vmClient}
        az.netClient = &interfacesClientImpl{netClient}
-       az.storageAcctClient = storageAcctClient
+
+       result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
+       if err != nil {
+               az.logger.WithError(err).Warn("Couldn't get account keys")
+               return err
+       }
+
+       key1 := *(*result.Keys)[0].Value
+       client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
+       if err != nil {
+               az.logger.WithError(err).Warn("Couldn't make client")
+               return err
+       }
+
+       blobsvc := client.GetBlobService()
+       az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
 
        az.dispatcherID = dispatcherID
        az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
 
-       az.ctx, az.stopFunc = context.WithCancel(context.Background())
        go func() {
                az.stopWg.Add(1)
                defer az.stopWg.Done()
@@ -311,15 +335,12 @@ func (az *azureInstanceSet) Create(
        instanceType arvados.InstanceType,
        imageID cloud.ImageID,
        newTags cloud.InstanceTags,
+       initCommand cloud.InitCommand,
        publicKey ssh.PublicKey) (cloud.Instance, error) {
 
        az.stopWg.Add(1)
        defer az.stopWg.Done()
 
-       if len(newTags["node-token"]) == 0 {
-               return nil, fmt.Errorf("Must provide tag 'node-token'")
-       }
-
        name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
        if err != nil {
                return nil, err
@@ -336,8 +357,6 @@ func (az *azureInstanceSet) Create(
                tags["dispatch-"+k] = &newstr
        }
 
-       tags["dispatch-instance-type"] = &instanceType.Name
-
        nicParameters := network.Interface{
                Location: &az.azconfig.Location,
                Tags:     tags,
@@ -365,14 +384,14 @@ func (az *azureInstanceSet) Create(
                return nil, wrapAzureError(err)
        }
 
-       instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
+       blobname := fmt.Sprintf("%s-os.vhd", name)
+       instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
                az.azconfig.StorageAccount,
                az.azureEnv.StorageEndpointSuffix,
                az.azconfig.BlobContainer,
-               name)
+               blobname)
 
-       customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
-echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
+       customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
 
        vmParameters := compute.VirtualMachine{
                Location: &az.azconfig.Location,
@@ -406,13 +425,13 @@ echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
                        },
                        OsProfile: &compute.OSProfile{
                                ComputerName:  &name,
-                               AdminUsername: to.StringPtr("crunch"),
+                               AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
                                LinuxConfiguration: &compute.LinuxConfiguration{
                                        DisablePasswordAuthentication: to.BoolPtr(true),
                                        SSH: &compute.SSHConfiguration{
                                                PublicKeys: &[]compute.SSHPublicKey{
-                                                       compute.SSHPublicKey{
-                                                               Path:    to.StringPtr("/home/crunch/.ssh/authorized_keys"),
+                                                       {
+                                                               Path:    to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
                                                                KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
                                                        },
                                                },
@@ -425,6 +444,16 @@ echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
 
        vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
        if err != nil {
+               _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
+               if delerr != nil {
+                       az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
+               }
+
+               _, delerr = az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
+               if delerr != nil {
+                       az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
+               }
+
                return nil, wrapAzureError(err)
        }
 
@@ -494,8 +523,8 @@ func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
                                if result.Value().Tags["created-at"] != nil {
                                        createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
                                        if err == nil {
-                                               if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
-                                                       az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
+                                               if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
+                                                       az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
                                                        az.deleteNIC <- *result.Value().Name
                                                }
                                        }
@@ -512,27 +541,12 @@ func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
 // leased to a VM) and haven't been modified for
 // DeleteDanglingResourcesAfter seconds.
 func (az *azureInstanceSet) manageBlobs() {
-       result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
-       if err != nil {
-               az.logger.WithError(err).Warn("Couldn't get account keys")
-               return
-       }
-
-       key1 := *(*result.Keys)[0].Value
-       client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
-       if err != nil {
-               az.logger.WithError(err).Warn("Couldn't make client")
-               return
-       }
-
-       blobsvc := client.GetBlobService()
-       blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
 
        page := storage.ListBlobsParameters{Prefix: az.namePrefix}
        timestamp := time.Now()
 
        for {
-               response, err := blobcont.ListBlobs(page)
+               response, err := az.blobcont.ListBlobs(page)
                if err != nil {
                        az.logger.WithError(err).Warn("Error listing blobs")
                        return
@@ -631,66 +645,19 @@ func (ai *azureInstance) Destroy() error {
 }
 
 func (ai *azureInstance) Address() string {
-       return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
-}
-
-func (ai *azureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
-       ai.provider.stopWg.Add(1)
-       defer ai.provider.stopWg.Done()
-
-       remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
-
-       tags := ai.Tags()
-
-       tg := tags["ssh-pubkey-fingerprint"]
-       if tg != "" {
-               if remoteFingerprint == tg {
-                       return nil
-               }
-               return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
-       }
-
-       nodetokenTag := tags["node-token"]
-       if nodetokenTag == "" {
-               return fmt.Errorf("Missing node token tag")
-       }
+       if ai.nic.IPConfigurations != nil &&
+               len(*ai.nic.IPConfigurations) > 0 &&
+               (*ai.nic.IPConfigurations)[0].PrivateIPAddress != nil {
 
-       sess, err := client.NewSession()
-       if err != nil {
-               return err
-       }
-
-       nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
-       if err != nil {
-               return err
+               return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
        }
+       return ""
+}
 
-       nodetoken := strings.TrimSpace(string(nodetokenbytes))
-
-       expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
-
-       if strings.TrimSpace(nodetoken) != expectedToken {
-               return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
-       }
-
-       sess, err = client.NewSession()
-       if err != nil {
-               return err
-       }
-
-       keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
-       if err != nil {
-               return err
-       }
-
-       sp := strings.Split(string(keyfingerprintbytes), " ")
-
-       if remoteFingerprint != sp[1] {
-               return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
-       }
+func (ai *azureInstance) RemoteUser() string {
+       return ai.provider.azconfig.AdminUsername
+}
 
-       tags["ssh-pubkey-fingerprint"] = sp[1]
-       delete(tags, "node-token")
-       ai.SetTags(tags)
-       return nil
+func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+       return cloud.ErrNotImplemented
 }
index 850a3fb4270fd9c5da7b7829e9f00fa27c630a49..72832578dfa68ea022c75a6a6a69ef35018be15c 100644 (file)
@@ -25,6 +25,7 @@
 //      StorageAccount: example
 //      BlobContainer: vhds
 //      DeleteDanglingResourcesAfter: 20s
+//      AdminUsername: crunch
 
 package azure
 
@@ -50,7 +51,6 @@ import (
        "github.com/Azure/go-autorest/autorest"
        "github.com/Azure/go-autorest/autorest/azure"
        "github.com/Azure/go-autorest/autorest/to"
-       "github.com/jmcvetta/randutil"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
        check "gopkg.in/check.v1"
@@ -105,6 +105,16 @@ func (*InterfacesClientStub) listComplete(ctx context.Context, resourceGroupName
        return network.InterfaceListResultIterator{}, nil
 }
 
+type BlobContainerStub struct{}
+
+func (*BlobContainerStub) GetBlobReference(name string) *storage.Blob {
+       return nil
+}
+
+func (*BlobContainerStub) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+       return storage.BlobListResponse{}, nil
+}
+
 type testConfig struct {
        ImageIDForTestSuite string
        DriverParameters    json.RawMessage
@@ -148,6 +158,7 @@ func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error)
        ap.ctx, ap.stopFunc = context.WithCancel(context.Background())
        ap.vmClient = &VirtualMachinesClientStub{}
        ap.netClient = &InterfacesClientStub{}
+       ap.blobcont = &BlobContainerStub{}
        return &ap, cloud.ImageID("blob"), cluster, nil
 }
 
@@ -160,18 +171,16 @@ func (*AzureInstanceSetSuite) TestCreate(c *check.C) {
        pk, _, _, _, err := ssh.ParseAuthorizedKey(testKey)
        c.Assert(err, check.IsNil)
 
-       nodetoken, err := randutil.String(40, "abcdefghijklmnopqrstuvwxyz0123456789")
-       c.Assert(err, check.IsNil)
-
        inst, err := ap.Create(cluster.InstanceTypes["tiny"],
                img, map[string]string{
-                       "node-token": nodetoken},
-               pk)
+                       "TestTagName": "test tag value",
+               }, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
 
        c.Assert(err, check.IsNil)
 
-       tg := inst.Tags()
-       log.Printf("Result %v %v %v", inst.String(), inst.Address(), tg)
+       tags := inst.Tags()
+       c.Check(tags["TestTagName"], check.Equals, "test tag value")
+       c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
 
 }
 
@@ -306,19 +315,22 @@ func (*AzureInstanceSetSuite) TestSSH(c *check.C) {
        c.Assert(err, check.IsNil)
 
        if len(l) > 0 {
-
                sshclient, err := SetupSSHClient(c, l[0])
                c.Assert(err, check.IsNil)
+               defer sshclient.Conn.Close()
 
                sess, err := sshclient.NewSession()
                c.Assert(err, check.IsNil)
-
-               out, err := sess.Output("cat /home/crunch/node-token")
+               defer sess.Close()
+               _, err = sess.Output("find /var/run/test-file -maxdepth 0 -user root -perm 0600")
                c.Assert(err, check.IsNil)
 
-               log.Printf("%v", string(out))
-
-               sshclient.Conn.Close()
+               sess, err = sshclient.NewSession()
+               c.Assert(err, check.IsNil)
+               defer sess.Close()
+               out, err := sess.Output("sudo cat /var/run/test-file")
+               c.Assert(err, check.IsNil)
+               c.Check(string(out), check.Equals, "test-file-data")
        }
 }
 
index 46a2c1682404ec067bc7c332a77c469f0b353d57..792e737a914a1ce7d39d98c05c1a9428e77fb1ff 100644 (file)
@@ -6,6 +6,7 @@ package cloud
 
 import (
        "encoding/json"
+       "errors"
        "io"
        "time"
 
@@ -57,17 +58,25 @@ type Executor interface {
        Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
 }
 
+var ErrNotImplemented = errors.New("not implemented")
+
 // An ExecutorTarget is a remote command execution service.
 type ExecutorTarget interface {
        // SSH server hostname or IP address, or empty string if
        // unknown while instance is booting.
        Address() string
 
+       // Remote username to send during SSH authentication.
+       RemoteUser() string
+
        // Return nil if the given public key matches the instance's
        // SSH server key. If the provided Dialer is not nil,
        // VerifyHostKey can use it to make outgoing network
        // connections from the instance -- e.g., to use the cloud's
        // "this instance's metadata" API.
+       //
+       // Return ErrNotImplemented if no verification mechanism is
+       // available.
        VerifyHostKey(ssh.PublicKey, *ssh.Client) error
 }
 
@@ -102,12 +111,18 @@ type Instance interface {
 // All public methods of an InstanceSet, and all public methods of the
 // instances it returns, are goroutine safe.
 type InstanceSet interface {
-       // Create a new instance. If supported by the driver, add the
+       // Create a new instance with the given type, image, and
+       // initial set of tags. If supported by the driver, add the
        // provided public key to /root/.ssh/authorized_keys.
        //
+       // The given InitCommand should be executed on the newly
+       // created instance. This is optional for a driver whose
+       // instances' VerifyHostKey() method never returns
+       // ErrNotImplemented. InitCommand will be under 1 KiB.
+       //
        // The returned error should implement RateLimitError and
        // QuotaError where applicable.
-       Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+       Create(arvados.InstanceType, ImageID, InstanceTags, InitCommand, ssh.PublicKey) (Instance, error)
 
        // Return all instances, including ones that are booting or
        // shutting down. Optionally, filter out nodes that don't have
@@ -125,6 +140,8 @@ type InstanceSet interface {
        Stop()
 }
 
+type InitCommand string
+
 // A Driver returns an InstanceSet that uses the given InstanceSetID
 // and driver-dependent configuration parameters.
 //
index 94eb2580bd759c9c781dc2346353440124207899..c1d4657ba47b7801b63bad0222e4a2df71f7881d 100644 (file)
@@ -5,6 +5,8 @@
 package controller
 
 import (
+       "context"
+
        "git.curoverse.com/arvados.git/lib/cmd"
        "git.curoverse.com/arvados.git/lib/service"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -12,6 +14,6 @@ import (
 
 var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
 
-func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
        return &Handler{Cluster: cluster, NodeProfile: np}
 }
index d49d16a35eee1be2fe3cbe0f765921316fdafa64..62916acd2ac10be14d90d4e02e2703e77949e32b 100644 (file)
@@ -19,6 +19,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "github.com/sirupsen/logrus"
@@ -29,7 +30,7 @@ import (
 var _ = check.Suite(&FederationSuite{})
 
 type FederationSuite struct {
-       log *logrus.Logger
+       log logrus.FieldLogger
        // testServer and testHandler are the controller being tested,
        // "zhome".
        testServer  *httpserver.Server
@@ -44,9 +45,7 @@ type FederationSuite struct {
 }
 
 func (s *FederationSuite) SetUpTest(c *check.C) {
-       s.log = logrus.New()
-       s.log.Formatter = &logrus.JSONFormatter{}
-       s.log.Out = &logWriter{c.Log}
+       s.log = ctxlog.TestLogger(c)
 
        s.remoteServer = newServerFromIntegrationTestEnv(c)
        c.Assert(s.remoteServer.Start(), check.IsNil)
index f11228a31350b93f2da70a7b5ab46b8926a47b06..dfe60d90a5f3119909658149b1017f3b782515f3 100644 (file)
@@ -5,6 +5,7 @@
 package controller
 
 import (
+       "context"
        "encoding/json"
        "net/http"
        "net/http/httptest"
@@ -16,6 +17,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        check "gopkg.in/check.v1"
 )
@@ -30,9 +32,13 @@ var _ = check.Suite(&HandlerSuite{})
 type HandlerSuite struct {
        cluster *arvados.Cluster
        handler http.Handler
+       ctx     context.Context
+       cancel  context.CancelFunc
 }
 
 func (s *HandlerSuite) SetUpTest(c *check.C) {
+       s.ctx, s.cancel = context.WithCancel(context.Background())
+       s.ctx = ctxlog.Context(s.ctx, ctxlog.New(os.Stderr, "json", "debug"))
        s.cluster = &arvados.Cluster{
                ClusterID:  "zzzzz",
                PostgreSQL: integrationTestCluster().PostgreSQL,
@@ -44,7 +50,11 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
                },
        }
        node := s.cluster.NodeProfiles["*"]
-       s.handler = newHandler(s.cluster, &node)
+       s.handler = newHandler(s.ctx, s.cluster, &node)
+}
+
+func (s *HandlerSuite) TearDownTest(c *check.C) {
+       s.cancel()
 }
 
 func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) {
index 95f17e79e6d8370e8c211f928264dff9869189c2..ae89c3d7ea4d073fa44885f193af138f81b85508 100644 (file)
@@ -5,28 +5,16 @@
 package controller
 
 import (
-       "bytes"
        "net/http"
        "os"
        "path/filepath"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
-// logWriter is an io.Writer that writes by calling a "write log"
-// function, typically (*check.C)Log().
-type logWriter struct {
-       logfunc func(...interface{})
-}
-
-func (tl *logWriter) Write(buf []byte) (int, error) {
-       tl.logfunc(string(bytes.TrimRight(buf, "\n")))
-       return len(buf), nil
-}
-
 func integrationTestCluster() *arvados.Cluster {
        cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
        if err != nil {
@@ -42,9 +30,7 @@ func integrationTestCluster() *arvados.Cluster {
 // Return a new unstarted controller server, using the Rails API
 // provided by the integration-testing environment.
 func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
-       log := logrus.New()
-       log.Formatter = &logrus.JSONFormatter{}
-       log.Out = &logWriter{c.Log}
+       log := ctxlog.TestLogger(c)
 
        nodeProfile := arvados.NodeProfile{
                Controller: arvados.SystemServiceInstance{Listen: ":"},
index 92948fb300e703971e59957b4f6f98db176a42ef..7231e839475639c2aa5e6c720091c15b4d4b5ed7 100644 (file)
@@ -5,6 +5,8 @@
 package dispatchcloud
 
 import (
+       "context"
+
        "git.curoverse.com/arvados.git/lib/cmd"
        "git.curoverse.com/arvados.git/lib/service"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -12,8 +14,8 @@ import (
 
 var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
 
-func newHandler(cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
-       d := &dispatcher{Cluster: cluster}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
+       d := &dispatcher{Cluster: cluster, Context: ctx}
        go d.Start()
        return d
 }
index 297782c35b9a972668fd7623d251e3411b26389b..bbe47625a893d6874d2c3c415952948f290de74f 100644 (file)
@@ -131,7 +131,7 @@ func (cq *Queue) Forget(uuid string) {
        defer cq.mtx.Unlock()
        ctr := cq.current[uuid].Container
        if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
-               delete(cq.current, uuid)
+               cq.delEnt(uuid, ctr.State)
        }
 }
 
@@ -196,7 +196,7 @@ func (cq *Queue) Update() error {
                        cq.current[uuid] = cur
                }
        }
-       for uuid := range cq.current {
+       for uuid, ent := range cq.current {
                if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
                        // Don't expunge an entry that was
                        // added/updated locally after we started
@@ -207,7 +207,7 @@ func (cq *Queue) Update() error {
                        // the poll response (evidently it's
                        // cancelled, completed, deleted, or taken by
                        // a different dispatcher).
-                       delete(cq.current, uuid)
+                       cq.delEnt(uuid, ent.Container.State)
                }
        }
        cq.dontupdate = nil
@@ -216,6 +216,15 @@ func (cq *Queue) Update() error {
        return nil
 }
 
+// Caller must have lock.
+func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
+       cq.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "State":         state,
+       }).Info("dropping container from queue")
+       delete(cq.current, uuid)
+}
+
 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
        it, err := cq.chooseType(&ctr)
        if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
@@ -269,6 +278,12 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
                }()
                return
        }
+       cq.logger.WithFields(logrus.Fields{
+               "ContainerUUID": ctr.UUID,
+               "State":         ctr.State,
+               "Priority":      ctr.Priority,
+               "InstanceType":  it.Name,
+       }).Info("adding container to queue")
        cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
 }
 
index 2d73afcd2624ed1e55041f25eda9e91fe0f8f8ed..adf1028b35fe16ab13afbfcb4f0c91672ec17849 100644 (file)
@@ -5,6 +5,7 @@
 package dispatchcloud
 
 import (
+       "context"
        "crypto/md5"
        "encoding/json"
        "fmt"
@@ -20,6 +21,7 @@ import (
        "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "github.com/julienschmidt/httprouter"
        "github.com/prometheus/client_golang/prometheus"
@@ -42,6 +44,7 @@ type pool interface {
 
 type dispatcher struct {
        Cluster       *arvados.Cluster
+       Context       context.Context
        InstanceSetID cloud.InstanceSetID
 
        logger      logrus.FieldLogger
@@ -116,7 +119,7 @@ func (disp *dispatcher) initialize() {
        }
        disp.stop = make(chan struct{}, 1)
        disp.stopped = make(chan struct{})
-       disp.logger = logrus.StandardLogger()
+       disp.logger = ctxlog.FromContext(disp.Context)
 
        if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
                disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
@@ -130,7 +133,7 @@ func (disp *dispatcher) initialize() {
        }
        disp.instanceSet = instanceSet
        disp.reg = prometheus.NewRegistry()
-       disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+       disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
        disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
 
        if disp.Cluster.ManagementToken == "" {
@@ -141,9 +144,9 @@ func (disp *dispatcher) initialize() {
                mux := httprouter.New()
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
-               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/hold", disp.apiInstanceHold)
-               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/drain", disp.apiInstanceDrain)
-               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/run", disp.apiInstanceRun)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
                metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
                        ErrorLog: disp.logger,
                })
@@ -166,7 +169,7 @@ func (disp *dispatcher) run() {
        if pollInterval <= 0 {
                pollInterval = defaultPollInterval
        }
-       sched := scheduler.New(disp.logger, disp.queue, disp.pool, staleLockTimeout, pollInterval)
+       sched := scheduler.New(disp.Context, disp.queue, disp.pool, staleLockTimeout, pollInterval)
        sched.Start()
        defer sched.Stop()
 
@@ -210,8 +213,11 @@ func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
 }
 
 func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
-       params, _ := r.Context().Value(httprouter.ParamsKey).(httprouter.Params)
-       id := cloud.InstanceID(params.ByName("instance_id"))
+       id := cloud.InstanceID(r.FormValue("instance_id"))
+       if id == "" {
+               httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+               return
+       }
        err := disp.pool.SetIdleBehavior(id, want)
        if err != nil {
                httpserver.Error(w, err.Error(), http.StatusNotFound)
index 0558d79f1a5b95737ab61958e467a7ee73177df9..36b06020748f43f5f4c7bbdefb5302935dedb861 100644 (file)
@@ -5,6 +5,7 @@
 package dispatchcloud
 
 import (
+       "context"
        "encoding/json"
        "io/ioutil"
        "math/rand"
@@ -16,6 +17,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "golang.org/x/crypto/ssh"
        check "gopkg.in/check.v1"
 )
@@ -23,12 +25,16 @@ import (
 var _ = check.Suite(&DispatcherSuite{})
 
 type DispatcherSuite struct {
+       ctx        context.Context
+       cancel     context.CancelFunc
        cluster    *arvados.Cluster
        stubDriver *test.StubDriver
        disp       *dispatcher
 }
 
 func (s *DispatcherSuite) SetUpTest(c *check.C) {
+       s.ctx, s.cancel = context.WithCancel(context.Background())
+       s.ctx = ctxlog.Context(s.ctx, ctxlog.TestLogger(c))
        dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
        dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
        c.Assert(err, check.IsNil)
@@ -73,13 +79,17 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        },
                },
        }
-       s.disp = &dispatcher{Cluster: s.cluster}
+       s.disp = &dispatcher{
+               Cluster: s.cluster,
+               Context: s.ctx,
+       }
        // Test cases can modify s.cluster before calling
        // initialize(), and then modify private state before calling
        // go run().
 }
 
 func (s *DispatcherSuite) TearDownTest(c *check.C) {
+       s.cancel()
        s.disp.Close()
 }
 
index d7f4585619417904a1125bc05d54d58499199179..6fb46b5f46f9d36c2500ee759cee1a3ec19c59f0 100644 (file)
@@ -46,7 +46,7 @@ func estimateDockerImageSize(collectionPDH string) int64 {
        // the size of the manifest.
        //
        // Use the following heuristic:
-       // - Start with the length of the mainfest (n)
+       // - Start with the length of the manifest (n)
        // - Subtract 80 characters for the filename and file segment
        // - Divide by 42 to get the number of block identifiers ('hash\+size\ ' is 32+1+8+1)
        // - Assume each block is full, multiply by 64 MiB
index 4bd27021c675d1c8ce40753d131d0631041ea59c..148b653c2e52305b2ece2255c49d98bf6cb72f50 100644 (file)
@@ -23,7 +23,7 @@ func (sch *Scheduler) fixStaleLocks() {
        var stale []string
        timeout := time.NewTimer(sch.staleLockTimeout)
 waiting:
-       for {
+       for sch.pool.CountWorkers()[worker.StateUnknown] > 0 {
                running := sch.pool.Running()
                qEntries, _ := sch.queue.Entries()
 
@@ -43,11 +43,6 @@ waiting:
 
                select {
                case <-wp:
-                       // Stop waiting if all workers have been
-                       // contacted.
-                       if sch.pool.CountWorkers()[worker.StateUnknown] == 0 {
-                               break waiting
-                       }
                case <-timeout.C:
                        // Give up.
                        break waiting
index ecdae7f876786b948019b871d3018e3e42b48901..d102d2fd2041c71d8a7a60f0d5fed4730119875b 100644 (file)
@@ -6,6 +6,7 @@ package scheduler
 
 import (
        "sort"
+       "time"
 
        "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -50,7 +51,7 @@ tryrun:
                                overquota = sorted[i:]
                                break tryrun
                        }
-                       sch.bgLock(logger, ctr.UUID)
+                       go sch.lockContainer(logger, ctr.UUID)
                        unalloc[it]--
                case arvados.ContainerStateLocked:
                        if unalloc[it] > 0 {
@@ -120,22 +121,16 @@ tryrun:
        }
 }
 
-// Start an API call to lock the given container, and return
-// immediately while waiting for the response in a new goroutine. Do
-// nothing if a lock request is already in progress for this
-// container.
-func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
-       logger.Debug("locking")
-       sch.mtx.Lock()
-       defer sch.mtx.Unlock()
-       if sch.locking[uuid] {
-               logger.Debug("locking in progress, doing nothing")
+// Lock the given container. Should be called in a new goroutine.
+func (sch *Scheduler) lockContainer(logger logrus.FieldLogger, uuid string) {
+       if !sch.uuidLock(uuid, "lock") {
                return
        }
+       defer sch.uuidUnlock(uuid)
        if ctr, ok := sch.queue.Get(uuid); !ok || ctr.State != arvados.ContainerStateQueued {
                // This happens if the container has been cancelled or
                // locked since runQueue called sch.queue.Entries(),
-               // possibly by a bgLock() call from a previous
+               // possibly by a lockContainer() call from a previous
                // runQueue iteration. In any case, we will respond
                // appropriately on the next runQueue iteration, which
                // will have already been triggered by the queue
@@ -143,24 +138,50 @@ func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
                logger.WithField("State", ctr.State).Debug("container no longer queued by the time we decided to lock it, doing nothing")
                return
        }
-       sch.locking[uuid] = true
-       go func() {
-               defer func() {
-                       sch.mtx.Lock()
-                       defer sch.mtx.Unlock()
-                       delete(sch.locking, uuid)
-               }()
-               err := sch.queue.Lock(uuid)
-               if err != nil {
-                       logger.WithError(err).Warn("error locking container")
-                       return
-               }
-               logger.Debug("lock succeeded")
-               ctr, ok := sch.queue.Get(uuid)
-               if !ok {
-                       logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
-               } else if ctr.State != arvados.ContainerStateLocked {
-                       logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
-               }
-       }()
+       err := sch.queue.Lock(uuid)
+       if err != nil {
+               logger.WithError(err).Warn("error locking container")
+               return
+       }
+       logger.Debug("lock succeeded")
+       ctr, ok := sch.queue.Get(uuid)
+       if !ok {
+               logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
+       } else if ctr.State != arvados.ContainerStateLocked {
+               logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
+       }
+}
+
+// Acquire a non-blocking lock for specified UUID, returning true if
+// successful.  The op argument is used only for debug logs.
+//
+// If the lock is not available, uuidLock arranges to wake up the
+// scheduler after a short delay, so it can retry whatever operation
+// is trying to get the lock (if that operation is still worth doing).
+//
+// This mechanism helps avoid spamming the controller/database with
+// concurrent updates for any single container, even when the
+// scheduler loop is running frequently.
+func (sch *Scheduler) uuidLock(uuid, op string) bool {
+       sch.mtx.Lock()
+       defer sch.mtx.Unlock()
+       logger := sch.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Op":            op,
+       })
+       if op, locked := sch.uuidOp[uuid]; locked {
+               logger.Debugf("uuidLock not available, Op=%s in progress", op)
+               // Make sure the scheduler loop wakes up to retry.
+               sch.wakeup.Reset(time.Second / 4)
+               return false
+       }
+       logger.Debug("uuidLock acquired")
+       sch.uuidOp[uuid] = op
+       return true
+}
+
+func (sch *Scheduler) uuidUnlock(uuid string) {
+       sch.mtx.Lock()
+       defer sch.mtx.Unlock()
+       delete(sch.uuidOp, uuid)
 }
index 7dd6866c0f389e51a393300f9235053159582d51..4296a1364c911fc94d44af28512ecac195b4e5f5 100644 (file)
@@ -5,12 +5,14 @@
 package scheduler
 
 import (
+       "context"
        "sync"
        "time"
 
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        check "gopkg.in/check.v1"
 )
 
@@ -120,6 +122,7 @@ type SchedulerSuite struct{}
 // immediately. Don't try to create any other nodes after the failed
 // create.
 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
        queue := test.Queue{
                ChooseType: chooseType,
                Containers: []arvados.Container{
@@ -174,7 +177,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
                running:   map[string]time.Time{},
                canCreate: 0,
        }
-       New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
        c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
        c.Check(pool.running, check.HasLen, 1)
@@ -186,6 +189,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
 // If Create() fails, shutdown some nodes, and don't call Create()
 // again.  Don't call Create() at all if AtQuota() is true.
 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
        for quota := 0; quota < 2; quota++ {
                c.Logf("quota=%d", quota)
                shouldCreate := []arvados.InstanceType{}
@@ -229,7 +233,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
                        starts:    []string{},
                        canCreate: 0,
                }
-               New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+               New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
                c.Check(pool.creates, check.DeepEquals, shouldCreate)
                c.Check(pool.starts, check.DeepEquals, []string{})
                c.Check(pool.shutdowns, check.Not(check.Equals), 0)
@@ -239,6 +243,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
 // Start lower-priority containers while waiting for new/existing
 // workers to come up for higher-priority containers.
 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
        pool := stubPool{
                unalloc: map[arvados.InstanceType]int{
                        test.InstanceType(1): 2,
@@ -317,7 +322,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                },
        }
        queue.Update()
-       New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
        c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
        running := map[string]bool{}
index 83fc08a9ffdb28c285965ca7a3f6cd41aba4dd7d..eb82c488390e3751fd7d3383acb1c1cf72af5e37 100644 (file)
@@ -7,9 +7,11 @@
 package scheduler
 
 import (
+       "context"
        "sync"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "github.com/sirupsen/logrus"
 )
 
@@ -32,8 +34,9 @@ type Scheduler struct {
        staleLockTimeout    time.Duration
        queueUpdateInterval time.Duration
 
-       locking map[string]bool
-       mtx     sync.Mutex
+       uuidOp map[string]string // operation in progress: "lock", "cancel", ...
+       mtx    sync.Mutex
+       wakeup *time.Timer
 
        runOnce sync.Once
        stop    chan struct{}
@@ -44,16 +47,17 @@ type Scheduler struct {
 //
 // Any given queue and pool should not be used by more than one
 // scheduler at a time.
-func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
        return &Scheduler{
-               logger:              logger,
+               logger:              ctxlog.FromContext(ctx),
                queue:               queue,
                pool:                pool,
                staleLockTimeout:    staleLockTimeout,
                queueUpdateInterval: queueUpdateInterval,
+               wakeup:              time.NewTimer(time.Second),
                stop:                make(chan struct{}),
                stopped:             make(chan struct{}),
-               locking:             map[string]bool{},
+               uuidOp:              map[string]string{},
        }
 }
 
@@ -75,7 +79,10 @@ func (sch *Scheduler) run() {
        // Ensure the queue is fetched once before attempting anything.
        for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
                sch.logger.Errorf("error updating queue: %s", err)
-               d := sch.queueUpdateInterval / 60
+               d := sch.queueUpdateInterval / 10
+               if d < time.Second {
+                       d = time.Second
+               }
                sch.logger.Infof("waiting %s before retry", d)
                time.Sleep(d)
        }
@@ -111,6 +118,7 @@ func (sch *Scheduler) run() {
                        return
                case <-queueNotify:
                case <-poolNotify:
+               case <-sch.wakeup.C:
                }
        }
 }
index 47c754e243dab20ae127cd8741c9decb9eea9688..23fc621dea26c76be659ddc4f88bea7565f4bd4c 100644 (file)
@@ -6,7 +6,6 @@ package scheduler
 
 import (
        "fmt"
-       "time"
 
        "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -25,32 +24,17 @@ import (
 // cancelled.
 func (sch *Scheduler) sync() {
        running := sch.pool.Running()
-       cancel := func(ent container.QueueEnt, reason string) {
-               uuid := ent.Container.UUID
-               logger := sch.logger.WithField("ContainerUUID", uuid)
-               logger.Infof("cancelling container because %s", reason)
-               err := sch.queue.Cancel(uuid)
-               if err != nil {
-                       logger.WithError(err).Print("error cancelling container")
-               }
-       }
-       kill := func(ent container.QueueEnt, reason string) {
-               uuid := ent.Container.UUID
-               logger := sch.logger.WithField("ContainerUUID", uuid)
-               logger.Debugf("killing crunch-run process because %s", reason)
-               sch.pool.KillContainer(uuid)
-       }
        qEntries, qUpdated := sch.queue.Entries()
        for uuid, ent := range qEntries {
                exited, running := running[uuid]
                switch ent.Container.State {
                case arvados.ContainerStateRunning:
                        if !running {
-                               go cancel(ent, "not running on any worker")
+                               go sch.cancel(ent, "not running on any worker")
                        } else if !exited.IsZero() && qUpdated.After(exited) {
-                               go cancel(ent, "state=\"Running\" after crunch-run exited")
+                               go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
                        } else if ent.Container.Priority == 0 {
-                               go kill(ent, fmt.Sprintf("priority=%d", ent.Container.Priority))
+                               go sch.kill(ent, "priority=0")
                        }
                case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
                        if running {
@@ -62,7 +46,7 @@ func (sch *Scheduler) sync() {
                                // of kill() will be to make the
                                // worker available for the next
                                // container.
-                               go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
                        } else {
                                sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
@@ -76,22 +60,60 @@ func (sch *Scheduler) sync() {
                                // a network outage and is still
                                // preparing to run a container that
                                // has already been unlocked/requeued.
-                               go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
                        }
                case arvados.ContainerStateLocked:
                        if running && !exited.IsZero() && qUpdated.After(exited) {
-                               logger := sch.logger.WithFields(logrus.Fields{
-                                       "ContainerUUID": uuid,
-                                       "Exited":        time.Since(exited).Seconds(),
-                               })
-                               logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
-                               err := sch.queue.Unlock(uuid)
-                               if err != nil {
-                                       logger.WithError(err).Info("error requeueing container")
-                               }
+                               go sch.requeue(ent, "crunch-run exited")
+                       } else if running && exited.IsZero() && ent.Container.Priority == 0 {
+                               go sch.kill(ent, "priority=0")
+                       } else if !running && ent.Container.Priority == 0 {
+                               go sch.requeue(ent, "priority=0")
                        }
                default:
-                       sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+                       sch.logger.WithFields(logrus.Fields{
+                               "ContainerUUID": uuid,
+                               "State":         ent.Container.State,
+                       }).Error("BUG: unexpected state")
                }
        }
 }
+
+func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
+       uuid := ent.Container.UUID
+       if !sch.uuidLock(uuid, "cancel") {
+               return
+       }
+       defer sch.uuidUnlock(uuid)
+       logger := sch.logger.WithField("ContainerUUID", uuid)
+       logger.Infof("cancelling container because %s", reason)
+       err := sch.queue.Cancel(uuid)
+       if err != nil {
+               logger.WithError(err).Print("error cancelling container")
+       }
+}
+
+func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
+       uuid := ent.Container.UUID
+       logger := sch.logger.WithField("ContainerUUID", uuid)
+       logger.Debugf("killing crunch-run process because %s", reason)
+       sch.pool.KillContainer(uuid)
+}
+
+func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
+       uuid := ent.Container.UUID
+       if !sch.uuidLock(uuid, "cancel") {
+               return
+       }
+       defer sch.uuidUnlock(uuid)
+       logger := sch.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "State":         ent.Container.State,
+               "Priority":      ent.Container.Priority,
+       })
+       logger.Infof("requeueing locked container because %s", reason)
+       err := sch.queue.Unlock(uuid)
+       if err != nil {
+               logger.WithError(err).Error("error requeueing container")
+       }
+}
index d0fb54c54cd932df806e0129a0f92d78cd3a9999..feed1c2a78b82a84821f22eee99e39e960dbd431 100644 (file)
@@ -38,6 +38,7 @@ func New(t cloud.ExecutorTarget) *Executor {
 type Executor struct {
        target     cloud.ExecutorTarget
        targetPort string
+       targetUser string
        signers    []ssh.Signer
        mtx        sync.RWMutex // controls access to instance after creation
 
@@ -182,6 +183,9 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
        if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
                // Target address does not specify a port.  Use
                // targetPort, or "ssh".
+               if h == "" {
+                       h = addr
+               }
                if p = exr.targetPort; p == "" {
                        p = "ssh"
                }
@@ -189,7 +193,7 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
        }
        var receivedKey ssh.PublicKey
        client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
-               User: "root",
+               User: target.RemoteUser(),
                Auth: []ssh.AuthMethod{
                        ssh.PublicKeys(exr.signers...),
                },
index f8565b4a710705c1367a54f9a954a753ff1d2715..e7c023586b4bb3c09ac8968c35c2cc3f1ed01ee2 100644 (file)
@@ -73,6 +73,7 @@ func (s *ExecutorSuite) TestBadHostKey(c *check.C) {
                                return 0
                        },
                        HostKey:        hostpriv,
+                       AuthorizedUser: "username",
                        AuthorizedKeys: []ssh.PublicKey{clientpub},
                },
        }
@@ -121,6 +122,7 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
                                        return uint32(exitcode)
                                },
                                HostKey:        hostpriv,
+                               AuthorizedUser: "username",
                                AuthorizedKeys: []ssh.PublicKey{clientpub},
                        },
                }
diff --git a/lib/dispatchcloud/test/logger.go b/lib/dispatchcloud/test/logger.go
deleted file mode 100644 (file)
index a59eeb6..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package test
-
-import (
-       "os"
-
-       "github.com/sirupsen/logrus"
-)
-
-func Logger() logrus.FieldLogger {
-       logger := logrus.StandardLogger()
-       if os.Getenv("ARVADOS_DEBUG") != "" {
-               logger.SetLevel(logrus.DebugLevel)
-       }
-       return logger
-}
index ed5995f4c5f0faa84356c01f2777d1e0a366cbc1..f1fde4f422ce55198742871883f8a0bbd7c682d3 100644 (file)
@@ -39,6 +39,7 @@ type SSHExecFunc func(env map[string]string, command string, stdin io.Reader, st
 type SSHService struct {
        Exec           SSHExecFunc
        HostKey        ssh.Signer
+       AuthorizedUser string
        AuthorizedKeys []ssh.PublicKey
 
        listener net.Listener
@@ -64,6 +65,11 @@ func (ss *SSHService) Address() string {
        return ln.Addr().String()
 }
 
+// RemoteUser returns the username that will be accepted.
+func (ss *SSHService) RemoteUser() string {
+       return ss.AuthorizedUser
+}
+
 // Close shuts down the server and releases resources. Established
 // connections are unaffected.
 func (ss *SSHService) Close() {
@@ -103,7 +109,7 @@ func (ss *SSHService) run() {
        }
        config.AddHostKey(ss.HostKey)
 
-       listener, err := net.Listen("tcp", ":")
+       listener, err := net.Listen("tcp", "127.0.0.1:")
        if err != nil {
                ss.err = err
                return
index fbab30b175d674ceec0b18d2b1726dfd930df1d7..5873e492213b86f58eaa98850c5c00c073cd2aee 100644 (file)
@@ -10,6 +10,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "io/ioutil"
        math_rand "math/rand"
        "regexp"
        "strings"
@@ -61,6 +62,7 @@ func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID
        }
        sis := StubInstanceSet{
                driver:  sd,
+               logger:  logger,
                servers: map[cloud.InstanceID]*StubVM{},
        }
        sd.instanceSets = append(sd.instanceSets, &sis)
@@ -90,6 +92,7 @@ func (sd *StubDriver) ReleaseCloudOps(n int) {
 
 type StubInstanceSet struct {
        driver  *StubDriver
+       logger  logrus.FieldLogger
        servers map[cloud.InstanceID]*StubVM
        mtx     sync.RWMutex
        stopped bool
@@ -98,7 +101,7 @@ type StubInstanceSet struct {
        allowInstancesCall time.Time
 }
 
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
        if sis.driver.HoldCloudOps {
                sis.driver.holdCloudOps <- true
        }
@@ -122,9 +125,11 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
                id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
                tags:         copyTags(tags),
                providerType: it.ProviderType,
+               initCommand:  cmd,
        }
        svm.SSHService = SSHService{
                HostKey:        sis.driver.HostKey,
+               AuthorizedUser: "root",
                AuthorizedKeys: ak,
                Exec:           svm.Exec,
        }
@@ -182,6 +187,7 @@ type StubVM struct {
        sis          *StubInstanceSet
        id           cloud.InstanceID
        tags         cloud.InstanceTags
+       initCommand  cloud.InitCommand
        providerType string
        SSHService   SSHService
        running      map[string]bool
@@ -205,6 +211,11 @@ func (svm *StubVM) Instance() stubInstance {
 }
 
 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       stdinData, err := ioutil.ReadAll(stdin)
+       if err != nil {
+               fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
+               return 1
+       }
        queue := svm.sis.driver.Queue
        uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
        if eta := svm.Boot.Sub(time.Now()); eta > 0 {
@@ -219,10 +230,16 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                fmt.Fprint(stderr, "crunch-run: command not found\n")
                return 1
        }
-       if strings.HasPrefix(command, "crunch-run --detach ") {
+       if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
+               var stdinKV map[string]string
+               err := json.Unmarshal(stdinData, &stdinKV)
+               if err != nil {
+                       fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
+                       return 1
+               }
                for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
-                       if env[name] == "" {
-                               fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
+                       if stdinKV[name] == "" {
+                               fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
                                return 1
                        }
                }
@@ -234,7 +251,7 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                svm.Unlock()
                time.Sleep(svm.CrunchRunDetachDelay)
                fmt.Fprintf(stderr, "starting %s\n", uuid)
-               logger := logrus.WithFields(logrus.Fields{
+               logger := svm.sis.logger.WithFields(logrus.Fields{
                        "Instance":      svm.id,
                        "ContainerUUID": uuid,
                })
@@ -319,6 +336,10 @@ func (si stubInstance) Address() string {
        return si.addr
 }
 
+func (si stubInstance) RemoteUser() string {
+       return si.svm.SSHService.AuthorizedUser
+}
+
 func (si stubInstance) Destroy() error {
        sis := si.svm.sis
        if sis.driver.HoldCloudOps {
index e6b50629892e62c250ebcf05ba5bb226daec1ce1..fe1c6ecc0304f64345135f6016d29a8b1512fea1 100644 (file)
@@ -5,7 +5,9 @@
 package worker
 
 import (
+       "crypto/rand"
        "errors"
+       "fmt"
        "io"
        "sort"
        "strings"
@@ -16,16 +18,19 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
 )
 
 const (
-       tagKeyInstanceType = "InstanceType"
-       tagKeyIdleBehavior = "IdleBehavior"
+       tagKeyInstanceType   = "InstanceType"
+       tagKeyIdleBehavior   = "IdleBehavior"
+       tagKeyInstanceSecret = "InstanceSecret"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
 type InstanceView struct {
        Instance             cloud.InstanceID `json:"instance"`
+       Address              string           `json:"address"`
        Price                float64          `json:"price"`
        ArvadosInstanceType  string           `json:"arvados_instance_type"`
        ProviderInstanceType string           `json:"provider_instance_type"`
@@ -84,7 +89,7 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
                logger:             logger,
                arvClient:          arvClient,
@@ -100,6 +105,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
                timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               installPublicKey:   installPublicKey,
                stop:               make(chan bool),
        }
        wp.registerMetrics(reg)
@@ -130,6 +136,7 @@ type Pool struct {
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
+       installPublicKey   ssh.PublicKey
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
@@ -146,13 +153,11 @@ type Pool struct {
        throttleCreate    throttle
        throttleInstances throttle
 
-       mInstances         prometheus.Gauge
-       mInstancesPrice    prometheus.Gauge
        mContainersRunning prometheus.Gauge
-       mVCPUs             prometheus.Gauge
-       mVCPUsInuse        prometheus.Gauge
-       mMemory            prometheus.Gauge
-       mMemoryInuse       prometheus.Gauge
+       mInstances         *prometheus.GaugeVec
+       mInstancesPrice    *prometheus.GaugeVec
+       mVCPUs             *prometheus.GaugeVec
+       mMemory            *prometheus.GaugeVec
 }
 
 // Subscribe returns a buffered channel that becomes ready after any
@@ -254,15 +259,18 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
        if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
                return false
        }
-       tags := cloud.InstanceTags{
-               tagKeyInstanceType: it.Name,
-               tagKeyIdleBehavior: string(IdleBehaviorRun),
-       }
        now := time.Now()
        wp.creating[it] = append(wp.creating[it], now)
        go func() {
                defer wp.notify()
-               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+               secret := randomHex(instanceSecretLength)
+               tags := cloud.InstanceTags{
+                       tagKeyInstanceType:   it.Name,
+                       tagKeyIdleBehavior:   string(IdleBehaviorRun),
+                       tagKeyInstanceSecret: secret,
+               }
+               initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
                // Remove our timestamp marker from wp.creating
@@ -318,6 +326,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 //
 // Caller must have lock.
 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+       inst = tagVerifier{inst}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
@@ -343,7 +352,8 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
 
        logger := wp.logger.WithFields(logrus.Fields{
                "InstanceType": it.Name,
-               "Instance":     inst,
+               "Instance":     inst.ID(),
+               "Address":      inst.Address(),
        })
        logger.WithFields(logrus.Fields{
                "State":        initialState,
@@ -482,10 +492,14 @@ func (wp *Pool) KillContainer(uuid string) {
 func (wp *Pool) kill(wkr *worker, uuid string) {
        logger := wp.logger.WithFields(logrus.Fields{
                "ContainerUUID": uuid,
-               "Instance":      wkr.instance,
+               "Instance":      wkr.instance.ID(),
        })
        logger.Debug("killing process")
-       stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
+       cmd := "crunch-run --kill 15 " + uuid
+       if u := wkr.instance.RemoteUser(); u != "root" {
+               cmd = "sudo " + cmd
+       }
+       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                logger.WithFields(logrus.Fields{
                        "stderr": string(stderr),
@@ -511,20 +525,6 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
        if reg == nil {
                reg = prometheus.NewRegistry()
        }
-       wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "instances_total",
-               Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
-       })
-       reg.MustRegister(wp.mInstances)
-       wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "instances_price_total",
-               Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
-       })
-       reg.MustRegister(wp.mInstancesPrice)
        wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
@@ -532,40 +532,40 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Number of containers reported running by cloud VMs.",
        })
        reg.MustRegister(wp.mContainersRunning)
-
-       wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_total",
+               Help:      "Number of cloud VMs.",
+       }, []string{"category"})
+       reg.MustRegister(wp.mInstances)
+       wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_price",
+               Help:      "Price of cloud VMs.",
+       }, []string{"category"})
+       reg.MustRegister(wp.mInstancesPrice)
+       wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "vcpus_total",
                Help:      "Total VCPUs on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mVCPUs)
-       wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "vcpus_inuse",
-               Help:      "VCPUs on cloud VMs that are running containers.",
-       })
-       reg.MustRegister(wp.mVCPUsInuse)
-       wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "memory_bytes_total",
                Help:      "Total memory on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mMemory)
-       wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "memory_bytes_inuse",
-               Help:      "Memory on cloud VMs that are running containers.",
-       })
-       reg.MustRegister(wp.mMemoryInuse)
 }
 
 func (wp *Pool) runMetrics() {
        ch := wp.Subscribe()
        defer wp.Unsubscribe(ch)
+       wp.updateMetrics()
        for range ch {
                wp.updateMetrics()
        }
@@ -575,26 +575,38 @@ func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
-       var price float64
-       var alloc, cpu, cpuInuse, mem, memInuse int64
+       instances := map[string]int64{}
+       price := map[string]float64{}
+       cpu := map[string]int64{}
+       mem := map[string]int64{}
+       var running int64
        for _, wkr := range wp.workers {
-               price += wkr.instType.Price
-               cpu += int64(wkr.instType.VCPUs)
-               mem += int64(wkr.instType.RAM)
-               if len(wkr.running)+len(wkr.starting) == 0 {
-                       continue
+               var cat string
+               switch {
+               case len(wkr.running)+len(wkr.starting) > 0:
+                       cat = "inuse"
+               case wkr.idleBehavior == IdleBehaviorHold:
+                       cat = "hold"
+               case wkr.state == StateBooting:
+                       cat = "booting"
+               case wkr.state == StateUnknown:
+                       cat = "unknown"
+               default:
+                       cat = "idle"
                }
-               alloc += int64(len(wkr.running) + len(wkr.starting))
-               cpuInuse += int64(wkr.instType.VCPUs)
-               memInuse += int64(wkr.instType.RAM)
-       }
-       wp.mInstances.Set(float64(len(wp.workers)))
-       wp.mInstancesPrice.Set(price)
-       wp.mContainersRunning.Set(float64(alloc))
-       wp.mVCPUs.Set(float64(cpu))
-       wp.mMemory.Set(float64(mem))
-       wp.mVCPUsInuse.Set(float64(cpuInuse))
-       wp.mMemoryInuse.Set(float64(memInuse))
+               instances[cat]++
+               price[cat] += wkr.instType.Price
+               cpu[cat] += int64(wkr.instType.VCPUs)
+               mem[cat] += int64(wkr.instType.RAM)
+               running += int64(len(wkr.running) + len(wkr.starting))
+       }
+       for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+               wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
+               wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+               wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+               wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+       }
+       wp.mContainersRunning.Set(float64(running))
 }
 
 func (wp *Pool) runProbes() {
@@ -673,6 +685,7 @@ func (wp *Pool) Instances() []InstanceView {
        for _, w := range wp.workers {
                r = append(r, InstanceView{
                        Instance:             w.instance.ID(),
+                       Address:              w.instance.Address(),
                        Price:                w.instType.Price,
                        ArvadosInstanceType:  w.instType.Name,
                        ProviderInstanceType: w.instType.ProviderType,
@@ -753,7 +766,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        continue
                }
                logger := wp.logger.WithFields(logrus.Fields{
-                       "Instance":    wkr.instance,
+                       "Instance":    wkr.instance.ID(),
                        "WorkerState": wkr.state,
                })
                logger.Info("instance disappeared in cloud")
@@ -771,3 +784,14 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                go wp.notify()
        }
 }
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+       buf := make([]byte, n/2)
+       _, err := rand.Read(buf)
+       if err != nil {
+               panic(err)
+       }
+       return fmt.Sprintf("%x", buf)
+}
index 3b66eeb417e7fb310768d950b50b7ad6c7066f0c..da9e650b8121889511886e9b16dc8eb827fcee14 100644 (file)
@@ -12,6 +12,7 @@ import (
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
@@ -62,7 +63,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
 
-       logger := test.Logger()
+       logger := ctxlog.TestLogger(c)
        driver := &test.StubDriver{}
        is, err := driver.InstanceSet(nil, "", logger)
        c.Assert(err, check.IsNil)
@@ -90,7 +91,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                },
        }
 
-       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
        notify := pool.Subscribe()
        defer pool.Unsubscribe(notify)
        pool.Create(type1)
@@ -108,7 +109,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 
        c.Log("------- starting new pool, waiting to recover state")
 
-       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
        notify2 := pool2.Subscribe()
        defer pool2.Unsubscribe(notify2)
        waitForIdle(pool2, notify2)
@@ -124,7 +125,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 }
 
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
-       logger := test.Logger()
+       logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{HoldCloudOps: true}
        instanceSet, err := driver.InstanceSet(nil, "", logger)
        c.Assert(err, check.IsNil)
diff --git a/lib/dispatchcloud/worker/verify.go b/lib/dispatchcloud/worker/verify.go
new file mode 100644 (file)
index 0000000..e22c85d
--- /dev/null
@@ -0,0 +1,56 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "bytes"
+       "errors"
+       "fmt"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "golang.org/x/crypto/ssh"
+)
+
+var (
+       errBadInstanceSecret = errors.New("bad instance secret")
+
+       // filename on instance, as given to shell (quoted accordingly)
+       instanceSecretFilename = "/var/run/arvados-instance-secret"
+       instanceSecretLength   = 40 // hex digits
+)
+
+type tagVerifier struct {
+       cloud.Instance
+}
+
+func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
+       expectSecret := tv.Instance.Tags()[tagKeyInstanceSecret]
+       if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || expectSecret == "" {
+               // If the wrapped instance indicates it has a way to
+               // verify the key, return that decision.
+               return err
+       }
+       session, err := client.NewSession()
+       if err != nil {
+               return err
+       }
+       defer session.Close()
+       var stdout, stderr bytes.Buffer
+       session.Stdin = bytes.NewBuffer(nil)
+       session.Stdout = &stdout
+       session.Stderr = &stderr
+       cmd := fmt.Sprintf("cat %s", instanceSecretFilename)
+       if u := tv.RemoteUser(); u != "root" {
+               cmd = "sudo " + cmd
+       }
+       err = session.Run(cmd)
+       if err != nil {
+               return err
+       }
+       if stdout.String() != expectSecret {
+               return errBadInstanceSecret
+       }
+       return nil
+}
index a24747267615b9b0d0d0c8851271e92bdb85087c..9be9f41f43b7ef51cbb1d1257e4ac39f642472aa 100644 (file)
@@ -6,6 +6,7 @@ package worker
 
 import (
        "bytes"
+       "encoding/json"
        "fmt"
        "strings"
        "sync"
@@ -13,6 +14,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
        "github.com/sirupsen/logrus"
 )
 
@@ -96,7 +98,7 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
                "ContainerUUID": ctr.UUID,
                "Priority":      ctr.Priority,
        })
-       logger = logger.WithField("Instance", wkr.instance)
+       logger = logger.WithField("Instance", wkr.instance.ID())
        logger.Debug("starting container")
        wkr.starting[ctr.UUID] = struct{}{}
        wkr.state = StateRunning
@@ -105,7 +107,19 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
                        "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
                        "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
                }
-               stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
+               if wkr.wp.arvClient.Insecure {
+                       env["ARVADOS_API_HOST_INSECURE"] = "1"
+               }
+               envJSON, err := json.Marshal(env)
+               if err != nil {
+                       panic(err)
+               }
+               stdin := bytes.NewBuffer(envJSON)
+               cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
+               if u := wkr.instance.RemoteUser(); u != "root" {
+                       cmd = "sudo " + cmd
+               }
+               stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
@@ -325,6 +339,9 @@ func (wkr *worker) probeAndUpdate() {
 
 func (wkr *worker) probeRunning() (running []string, ok bool) {
        cmd := "crunch-run --list"
+       if u := wkr.instance.RemoteUser(); u != "root" {
+               cmd = "sudo " + cmd
+       }
        stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                wkr.logger.WithFields(logrus.Fields{
@@ -400,7 +417,7 @@ func (wkr *worker) shutdownIfIdle() bool {
 
        wkr.logger.WithFields(logrus.Fields{
                "State":        wkr.state,
-               "Age":          age,
+               "IdleDuration": stats.Duration(age),
                "IdleBehavior": wkr.idleBehavior,
        }).Info("shutdown idle worker")
        wkr.shutdown()
@@ -427,22 +444,24 @@ func (wkr *worker) shutdown() {
 // match. Caller must have lock.
 func (wkr *worker) saveTags() {
        instance := wkr.instance
-       have := instance.Tags()
-       want := cloud.InstanceTags{
+       tags := instance.Tags()
+       update := cloud.InstanceTags{
                tagKeyInstanceType: wkr.instType.Name,
                tagKeyIdleBehavior: string(wkr.idleBehavior),
        }
-       go func() {
-               for k, v := range want {
-                       if v == have[k] {
-                               continue
-                       }
-                       err := instance.SetTags(want)
+       save := false
+       for k, v := range update {
+               if tags[k] != v {
+                       tags[k] = v
+                       save = true
+               }
+       }
+       if save {
+               go func() {
+                       err := instance.SetTags(tags)
                        if err != nil {
-                               wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+                               wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
                        }
-                       break
-
-               }
-       }()
+               }()
+       }
 }
index 2eb5255b87fb174410b05d2e1c9d8e66800187e3..3bc33b62c9fee896f107278a564b859d1448366e 100644 (file)
@@ -12,6 +12,7 @@ import (
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        check "gopkg.in/check.v1"
 )
 
@@ -20,13 +21,13 @@ var _ = check.Suite(&WorkerSuite{})
 type WorkerSuite struct{}
 
 func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
-       logger := test.Logger()
+       logger := ctxlog.TestLogger(c)
        bootTimeout := time.Minute
        probeTimeout := time.Second
 
        is, err := (&test.StubDriver{}).InstanceSet(nil, "", logger)
        c.Assert(err, check.IsNil)
-       inst, err := is.Create(arvados.InstanceType{}, "", nil, nil)
+       inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
        c.Assert(err, check.IsNil)
 
        type trialT struct {
index 7f6b0236cb9571cd3ca30420cb6d41af6d787bd3..d99af0eea15428054fd5adc16596ca89b1de7820 100644 (file)
@@ -6,6 +6,7 @@
 package service
 
 import (
+       "context"
        "flag"
        "fmt"
        "io"
@@ -14,6 +15,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cmd"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "github.com/coreos/go-systemd/daemon"
        "github.com/sirupsen/logrus"
@@ -24,7 +26,7 @@ type Handler interface {
        CheckHealth() error
 }
 
-type NewHandlerFunc func(*arvados.Cluster, *arvados.NodeProfile) Handler
+type NewHandlerFunc func(context.Context, *arvados.Cluster, *arvados.NodeProfile) Handler
 
 type command struct {
        newHandler NewHandlerFunc
@@ -45,11 +47,7 @@ func Command(svcName arvados.ServiceName, newHandler NewHandlerFunc) cmd.Handler
 }
 
 func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
-       log := logrus.New()
-       log.Formatter = &logrus.JSONFormatter{
-               TimestampFormat: rfc3339NanoFixed,
-       }
-       log.Out = stderr
+       log := ctxlog.New(stderr, "json", "info")
 
        var err error
        defer func() {
@@ -76,6 +74,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        if err != nil {
                return 1
        }
+       log = ctxlog.New(stderr, cluster.Logging.Format, cluster.Logging.Level).WithFields(logrus.Fields{
+               "PID": os.Getpid(),
+       })
+       ctx := ctxlog.Context(context.Background(), log)
        profileName := *nodeProfile
        if profileName == "" {
                profileName = os.Getenv("ARVADOS_NODE_PROFILE")
@@ -89,7 +91,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
                return 1
        }
-       handler := c.newHandler(cluster, profile)
+       handler := c.newHandler(ctx, cluster, profile)
        if err = handler.CheckHealth(); err != nil {
                return 1
        }
index 52fd4d21a115f29ee7fb388c11ebeb0564b40ae2..834ca195fdda02d859eafcd4aa0cea5c70c1c359 100644 (file)
@@ -289,10 +289,12 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
-    except Exception as e:
-        logger.error(e)
+    except Exception:
+        logger.exception("Error creating the Arvados CWL Executor")
         return 1
 
+    # Note that unless in debug mode, some stack traces related to user 
+    # workflow errors may be suppressed. See ArvadosJob.done().
     if arvargs.debug:
         logger.setLevel(logging.DEBUG)
         logging.getLogger('arvados').setLevel(logging.DEBUG)
index af7c02a8f30010bfe85e51a6928e63a5a617d37e..9f93f0a6a6895780b2aca6e5ed235aca64d6aac9 100644 (file)
@@ -304,8 +304,8 @@ class ArvadosContainer(JobBase):
                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
             else:
                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-        except Exception as e:
-            logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
+        except Exception:
+            logger.exception("%s got an error", self.arvrunner.label(self))
             self.output_callback({}, "permanentFail")
 
     def done(self, record):
@@ -342,7 +342,7 @@ class ArvadosContainer(JobBase):
             if record["output_uuid"]:
                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
                     # Compute the trash time to avoid requesting the collection record.
-                    trash_at = ciso8601.parse_datetime_unaware(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
+                    trash_at = ciso8601.parse_datetime(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
@@ -353,11 +353,13 @@ class ArvadosContainer(JobBase):
             if container["output"]:
                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
         except WorkflowException as e:
+            # Only include a stack trace if in debug mode. 
+            # A stack trace may obfuscate more useful output about the workflow. 
             logger.error("%s unable to collect output from %s:\n%s",
                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
             processStatus = "permanentFail"
-        except Exception as e:
-            logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
+        except Exception:
+            logger.exception("%s while getting output object:", self.arvrunner.label(self))
             processStatus = "permanentFail"
         finally:
             self.output_callback(outputs, processStatus)
@@ -523,8 +525,8 @@ class RunnerContainer(Runner):
             container = self.arvrunner.api.containers().get(
                 uuid=record["container_uuid"]
             ).execute(num_retries=self.arvrunner.num_retries)
-        except Exception as e:
-            logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
+        except Exception:
+            logger.exception("%s while getting runner container", self.arvrunner.label(self))
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             super(RunnerContainer, self).done(container)
index 84006b47d2a8ba86fd97f88b63772a53e3d711f6..a8f56ad1d4f30db21c5a59f0fb6df9258c723c58 100644 (file)
@@ -63,6 +63,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
                 arvados.commands.put.api_client = api_client
                 arvados.commands.keepdocker.main(args, stdout=sys.stderr, install_sig_handlers=False, api=api_client)
             except SystemExit as e:
+                # If e.code is None or zero, then keepdocker exited normally and we can continue
                 if e.code:
                     raise WorkflowException("keepdocker exited with code %s" % e.code)
 
index 69fe7e2a8f1b632675b16eeb3f6ad07ee76c00e8..ab2078e1571145aac5f334e076bfc00c7b951448 100644 (file)
@@ -199,7 +199,7 @@ class ArvadosJob(JobBase):
                                     e)
             else:
                 logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-        except Exception as e:
+        except Exception:
             logger.exception("%s error" % (self.arvrunner.label(self)))
             self.output_callback({}, "permanentFail")
 
@@ -224,8 +224,8 @@ class ArvadosJob(JobBase):
                             body={
                                 "components": components
                             }).execute(num_retries=self.arvrunner.num_retries)
-                except Exception as e:
-                    logger.info("Error adding to components: %s", e)
+                except Exception:
+                    logger.exception("Error adding to components")
 
     def done(self, record):
         try:
@@ -272,10 +272,12 @@ class ArvadosJob(JobBase):
                         outputs = done.done(self, record, dirs["tmpdir"],
                                             dirs["outdir"], dirs["keep"])
             except WorkflowException as e:
+                # Only include a stack trace if in debug mode. 
+                # This is most likely a user workflow error and a stack trace may obfuscate more useful output. 
                 logger.error("%s unable to collect output from %s:\n%s",
                              self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
-            except Exception as e:
+            except Exception:
                 logger.exception("Got unknown exception while collecting output for job %s:", self.name)
                 processStatus = "permanentFail"
 
index 535cfd7582b985ad806d659f518f9da9ce0e6fbc..319e8a887114b88b55865ca673dbafb3e0b9a7dc 100644 (file)
@@ -59,6 +59,7 @@ class RuntimeStatusLoggingHandler(logging.Handler):
     def __init__(self, runtime_status_update_func):
         super(RuntimeStatusLoggingHandler, self).__init__()
         self.runtime_status_update = runtime_status_update_func
+        self.updatingRuntimeStatus = False
 
     def emit(self, record):
         kind = None
@@ -66,22 +67,27 @@ class RuntimeStatusLoggingHandler(logging.Handler):
             kind = 'error'
         elif record.levelno >= logging.WARNING:
             kind = 'warning'
-        if kind is not None:
-            log_msg = record.getMessage()
-            if '\n' in log_msg:
-                # If the logged message is multi-line, use its first line as status
-                # and the rest as detail.
-                status, detail = log_msg.split('\n', 1)
-                self.runtime_status_update(
-                    kind,
-                    "%s: %s" % (record.name, status),
-                    detail
-                )
-            else:
-                self.runtime_status_update(
-                    kind,
-                    "%s: %s" % (record.name, record.getMessage())
-                )
+        if kind is not None and self.updatingRuntimeStatus is not True:
+            self.updatingRuntimeStatus = True
+            try:
+                log_msg = record.getMessage()
+                if '\n' in log_msg:
+                    # If the logged message is multi-line, use its first line as status
+                    # and the rest as detail.
+                    status, detail = log_msg.split('\n', 1)
+                    self.runtime_status_update(
+                        kind,
+                        "%s: %s" % (record.name, status),
+                        detail
+                    )
+                else:
+                    self.runtime_status_update(
+                        kind,
+                        "%s: %s" % (record.name, record.getMessage())
+                    )
+            finally:
+                self.updatingRuntimeStatus = False
+            
 
 class ArvCwlExecutor(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
@@ -361,8 +367,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                     keys = keys[pageSize:]
                     try:
                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
-                    except Exception as e:
-                        logger.warning("Error checking states on API server: %s", e)
+                    except Exception:
+                        logger.exception("Error checking states on API server: %s")
                         remain_wait = self.poll_interval
                         continue
 
@@ -393,9 +399,9 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         for i in self.intermediate_output_collections:
             try:
                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
-            except:
+            except Exception:
                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
-            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+            except (KeyboardInterrupt, SystemExit):
                 break
 
     def check_features(self, obj):
@@ -506,8 +512,9 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                               body={
                                                   'is_trashed': True
                                               }).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.info("Setting container output: %s", e)
+            except Exception:
+                logger.exception("Setting container output")
+                return
         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
                                    body={
@@ -731,8 +738,11 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         except:
             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
                 logger.error("Interrupted, workflow will be cancelled")
+            elif isinstance(sys.exc_info()[1], WorkflowException):
+                logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             else:
-                logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.exception("Workflow execution failed")
+
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
index fc7cc42d15c61f19021f006cb1eec18ab94178cd..3744b4a93afa40df10c17335310503500acd2432 100644 (file)
@@ -240,8 +240,8 @@ class CollectionFetcher(DefaultFetcher):
                     return True
         except arvados.errors.NotFoundError:
             return False
-        except:
-            logger.exception("Got unexpected exception checking if file exists:")
+        except Exception:
+            logger.exception("Got unexpected exception checking if file exists")
             return False
         return super(CollectionFetcher, self).check_exists(url)
 
index ad8e903ba65f28bec56710a3ed15915482362d72..e515ac2ce5e99f4ec75011b8ac51bfe2fc1bbff8 100644 (file)
@@ -7,15 +7,19 @@ standard_library.install_aliases()
 from future.utils import  viewvalues, viewitems
 
 import os
+import sys
 import urllib.parse
 from functools import partial
 import logging
 import json
-import subprocess32 as subprocess
 from collections import namedtuple
-
 from io import StringIO
 
+if os.name == "posix" and sys.version_info[0] < 3:
+    import subprocess32 as subprocess
+else:
+    import subprocess
+
 from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
@@ -474,8 +478,8 @@ class Runner(Process):
                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
             adjustFileObjs(outputs, keepify)
             adjustDirObjs(outputs, keepify)
-        except Exception as e:
-            logger.exception("[%s] While getting final output object: %s", self.name, e)
+        except Exception:
+            logger.exception("[%s] While getting final output object", self.name)
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
index 22c49a01bf4ed31592db013ac5f2cb49c4e789cd..d97e7428da0488e04009d4a0baeca01bbb18aa8b 100644 (file)
@@ -39,9 +39,12 @@ setup(name='arvados-cwl-runner',
           'ruamel.yaml >=0.15.54, <=0.15.77',
           'arvados-python-client>=1.3.0.20190205182514',
           'setuptools',
-          'ciso8601 >=1.0.6, <2.0.0',
-          'subprocess32>=3.5.1',
+          'ciso8601 >= 2.0.0',
       ],
+      extras_require={
+          ':os.name=="posix" and python_version<"3"': ['subprocess32 >= 3.5.1'],
+          ':python_version<"3"': ['pytz'],
+      },
       data_files=[
           ('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
       ],
index de21fc0b92d1c7437978be0a5018c3ea51fd76c3..1a57da3927a352e614f5a65ebb46887864ece07b 100644 (file)
@@ -80,6 +80,23 @@ class TestContainer(unittest.TestCase):
 
         return loadingContext, runtimeContext
 
+    # Helper function to set up the ArvCwlExecutor to use the containers api 
+    # and test that the RuntimeStatusLoggingHandler is set up correctly
+    def setup_and_test_container_executor_and_logging(self, gcc_mock) :
+        api = mock.MagicMock()
+        api._rootDesc = copy.deepcopy(get_rootDesc())
+        del api._rootDesc.get('resources')['jobs']['methods']['create']
+
+        # Make sure ArvCwlExecutor thinks it's running inside a container so it
+        # adds the logging handler that will call runtime_status_update() mock
+        self.assertFalse(gcc_mock.called)
+        runner = arvados_cwl.ArvCwlExecutor(api)
+        self.assertEqual(runner.work_api, 'containers')
+        root_logger = logging.getLogger('')
+        handlerClasses = [h.__class__ for h in root_logger.handlers]
+        self.assertTrue(arvados_cwl.RuntimeStatusLoggingHandler in handlerClasses)
+        return runner
+        
     # The test passes no builder.resources
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@@ -500,28 +517,30 @@ class TestContainer(unittest.TestCase):
         arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
         runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
 
+    # Test to make sure we dont call runtime_status_update if we already did
+    # some where higher up in the call stack
     @mock.patch("arvados_cwl.util.get_current_container")
-    @mock.patch("arvados.collection.CollectionReader")
-    @mock.patch("arvados.collection.Collection")
-    def test_child_failure(self, col, reader, gcc_mock):
-        api = mock.MagicMock()
-        api._rootDesc = copy.deepcopy(get_rootDesc())
-        del api._rootDesc.get('resources')['jobs']['methods']['create']
+    def test_recursive_runtime_status_update(self, gcc_mock):
+        self.setup_and_test_container_executor_and_logging(gcc_mock)
+        root_logger = logging.getLogger('')
 
-        # Set up runner with mocked runtime_status_update()
-        self.assertFalse(gcc_mock.called)
-        runtime_status_update = mock.MagicMock()
-        arvados_cwl.ArvCwlExecutor.runtime_status_update = runtime_status_update
-        runner = arvados_cwl.ArvCwlExecutor(api)
-        self.assertEqual(runner.work_api, 'containers')
+        # get_current_container is invoked when we call runtime_status_update
+        # so try and log again!
+        gcc_mock.side_effect = lambda *args: root_logger.error("Second Error")
+        try: 
+            root_logger.error("First Error")
+        except RuntimeError: 
+            self.fail("RuntimeStatusLoggingHandler should not be called recursively")
 
-        # Make sure ArvCwlExecutor thinks it's running inside a container so it
-        # adds the logging handler that will call runtime_status_update() mock
+    @mock.patch("arvados_cwl.ArvCwlExecutor.runtime_status_update")
+    @mock.patch("arvados_cwl.util.get_current_container")
+    @mock.patch("arvados.collection.CollectionReader")
+    @mock.patch("arvados.collection.Collection")
+    def test_child_failure(self, col, reader, gcc_mock, rts_mock):
+        runner = self.setup_and_test_container_executor_and_logging(gcc_mock)
+        
         gcc_mock.return_value = {"uuid" : "zzzzz-dz642-zzzzzzzzzzzzzzz"}
         self.assertTrue(gcc_mock.called)
-        root_logger = logging.getLogger('')
-        handlerClasses = [h.__class__ for h in root_logger.handlers]
-        self.assertTrue(arvados_cwl.RuntimeStatusLoggingHandler in handlerClasses)
 
         runner.num_retries = 0
         runner.ignore_docker_for_reuse = False
@@ -565,7 +584,7 @@ class TestContainer(unittest.TestCase):
             "modified_at": "2017-05-26T12:01:22Z"
         })
 
-        runtime_status_update.assert_called_with(
+        rts_mock.assert_called_with(
             'error',
             'arvados.cwl-runner: [container testjob] (zzzzz-xvhdp-zzzzzzzzzzzzzzz) error log:',
             '  ** log is empty **'
index 636cf350c151b150c59a96c45c312ea7af6d45c2..c2154d0f29cd1dbb6decad7962036dd9073bd24e 100644 (file)
@@ -66,6 +66,12 @@ type Cluster struct {
        RemoteClusters     map[string]RemoteCluster
        PostgreSQL         PostgreSQL
        RequestLimits      RequestLimits
+       Logging            Logging
+}
+
+type Logging struct {
+       Level  string
+       Format string
 }
 
 type PostgreSQL struct {
@@ -168,6 +174,9 @@ func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
                        if _, ok := (*it)[t.Name]; ok {
                                return errDuplicateInstanceTypeName
                        }
+                       if t.ProviderType == "" {
+                               t.ProviderType = t.Name
+                       }
                        (*it)[t.Name] = t
                }
                return nil
@@ -177,10 +186,14 @@ func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
        if err != nil {
                return err
        }
-       // Fill in Name field using hash key.
+       // Fill in Name field (and ProviderType field, if not
+       // specified) using hash key.
        *it = InstanceTypeMap(hash)
        for name, t := range *it {
                t.Name = name
+               if t.ProviderType == "" {
+                       t.ProviderType = name
+               }
                (*it)[name] = t
        }
        return nil
index 45e4efdbeff2c5e5e507e92c94c85c8a189d8263..e66eeadee1e1fc8d6b50cd3e10fa59e8a5a66a80 100644 (file)
@@ -5,9 +5,13 @@
 package ctxlog
 
 import (
+       "bytes"
        "context"
+       "io"
+       "os"
 
        "github.com/sirupsen/logrus"
+       check "gopkg.in/check.v1"
 )
 
 var (
@@ -19,45 +23,87 @@ const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
 
 // Context returns a new child context such that FromContext(child)
 // returns the given logger.
-func Context(ctx context.Context, logger *logrus.Entry) context.Context {
+func Context(ctx context.Context, logger logrus.FieldLogger) context.Context {
        return context.WithValue(ctx, loggerCtxKey, logger)
 }
 
 // FromContext returns the logger suitable for the given context -- the one
 // attached by contextWithLogger() if applicable, otherwise the
 // top-level logger with no fields/values.
-func FromContext(ctx context.Context) *logrus.Entry {
+func FromContext(ctx context.Context) logrus.FieldLogger {
        if ctx != nil {
-               if logger, ok := ctx.Value(loggerCtxKey).(*logrus.Entry); ok {
+               if logger, ok := ctx.Value(loggerCtxKey).(logrus.FieldLogger); ok {
                        return logger
                }
        }
        return rootLogger.WithFields(nil)
 }
 
+// New returns a new logger with the indicated format and
+// level.
+func New(out io.Writer, format, level string) logrus.FieldLogger {
+       logger := logrus.New()
+       logger.Out = out
+       setFormat(logger, format)
+       setLevel(logger, level)
+       return logger
+}
+
+func TestLogger(c *check.C) logrus.FieldLogger {
+       logger := logrus.New()
+       logger.Out = &logWriter{c.Log}
+       setFormat(logger, "text")
+       if d := os.Getenv("ARVADOS_DEBUG"); d != "0" && d != "" {
+               setLevel(logger, "debug")
+       } else {
+               setLevel(logger, "info")
+       }
+       return logger
+}
+
 // SetLevel sets the current logging level. See logrus for level
 // names.
 func SetLevel(level string) {
-       lvl, err := logrus.ParseLevel(level)
-       if err != nil {
-               logrus.Fatal(err)
+       setLevel(rootLogger, level)
+}
+
+func setLevel(logger *logrus.Logger, level string) {
+       if level == "" {
+       } else if lvl, err := logrus.ParseLevel(level); err != nil {
+               logrus.WithField("Level", level).Fatal("unknown log level")
+       } else {
+               logger.Level = lvl
        }
-       rootLogger.Level = lvl
 }
 
 // SetFormat sets the current logging format to "json" or "text".
 func SetFormat(format string) {
+       setFormat(rootLogger, format)
+}
+
+func setFormat(logger *logrus.Logger, format string) {
        switch format {
        case "text":
-               rootLogger.Formatter = &logrus.TextFormatter{
+               logger.Formatter = &logrus.TextFormatter{
                        FullTimestamp:   true,
                        TimestampFormat: rfc3339NanoFixed,
                }
-       case "json":
-               rootLogger.Formatter = &logrus.JSONFormatter{
+       case "json", "":
+               logger.Formatter = &logrus.JSONFormatter{
                        TimestampFormat: rfc3339NanoFixed,
                }
        default:
-               logrus.WithField("LogFormat", format).Fatal("unknown log format")
+               logrus.WithField("Format", format).Fatal("unknown log format")
        }
 }
+
+// logWriter is an io.Writer that writes by calling a "write log"
+// function, typically (*check.C)Log().
+type logWriter struct {
+       logfunc func(...interface{})
+}
+
+func (tl *logWriter) Write(buf []byte) (int, error) {
+       tl.logfunc(string(bytes.TrimRight(buf, "\n")))
+       return len(buf), nil
+}
index 031be97514e8f5bc933f9311bc348b02d60b51b0..2b8bbee6721ffd3f47e0304d81ed4f1f2a51a7da 100644 (file)
@@ -91,7 +91,6 @@ public class Arvados {
       }
     }
     arvadosRootUrl = "https://" + arvadosApiHost;
-    arvadosRootUrl += (arvadosApiHost.endsWith("/")) ? "" : "/";
 
     if (hostInsecure != null) {
       arvadosApiHostInsecure = Boolean.valueOf(hostInsecure);
index b579d41ed2839b73fa72c6486101abe97e46a0cc..485c757e7fce34dda579185608f39bfe4911bd94 100644 (file)
@@ -55,13 +55,13 @@ def normalize_stream(stream_name, stream):
                 if streamoffset == current_span[1]:
                     current_span[1] += segment.segment_size
                 else:
-                    stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+                    stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
                     current_span = [streamoffset, streamoffset + segment.segment_size]
 
         if current_span is not None:
-            stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+            stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
 
         if not stream[streamfile]:
-            stream_tokens.append("0:0:{0}".format(fout))
+            stream_tokens.append(u"0:0:{0}".format(fout))
 
     return stream_tokens
index 7ad07cc607206fe32f46fe0c94cf9ea34e115224..cf1a36f9fdfbbfdf739fe75027d00eaa782df4f2 100644 (file)
@@ -283,7 +283,7 @@ class CollectionWriter(CollectionBase):
             streampath, filename = split(streampath)
         if self._last_open and not self._last_open.closed:
             raise errors.AssertionError(
-                "can't open '{}' when '{}' is still open".format(
+                u"can't open '{}' when '{}' is still open".format(
                     filename, self._last_open.name))
         if streampath != self.current_stream_name():
             self.start_new_stream(streampath)
@@ -461,22 +461,22 @@ class ResumableCollectionWriter(CollectionWriter):
                 writer._queued_file.seek(pos)
             except IOError as error:
                 raise errors.StaleWriterStateError(
-                    "failed to reopen active file {}: {}".format(path, error))
+                    u"failed to reopen active file {}: {}".format(path, error))
         return writer
 
     def check_dependencies(self):
         for path, orig_stat in listitems(self._dependencies):
             if not S_ISREG(orig_stat[ST_MODE]):
-                raise errors.StaleWriterStateError("{} not file".format(path))
+                raise errors.StaleWriterStateError(u"{} not file".format(path))
             try:
                 now_stat = tuple(os.stat(path))
             except OSError as error:
                 raise errors.StaleWriterStateError(
-                    "failed to stat {}: {}".format(path, error))
+                    u"failed to stat {}: {}".format(path, error))
             if ((not S_ISREG(now_stat[ST_MODE])) or
                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
-                raise errors.StaleWriterStateError("{} changed".format(path))
+                raise errors.StaleWriterStateError(u"{} changed".format(path))
 
     def dump_state(self, copy_func=lambda x: x):
         state = {attr: copy_func(getattr(self, attr))
@@ -492,7 +492,7 @@ class ResumableCollectionWriter(CollectionWriter):
         try:
             src_path = os.path.realpath(source)
         except Exception:
-            raise errors.AssertionError("{} not a file path".format(source))
+            raise errors.AssertionError(u"{} not a file path".format(source))
         try:
             path_stat = os.stat(src_path)
         except OSError as stat_error:
@@ -505,10 +505,10 @@ class ResumableCollectionWriter(CollectionWriter):
             self._dependencies[source] = tuple(fd_stat)
         elif path_stat is None:
             raise errors.AssertionError(
-                "could not stat {}: {}".format(source, stat_error))
+                u"could not stat {}: {}".format(source, stat_error))
         elif path_stat.st_ino != fd_stat.st_ino:
             raise errors.AssertionError(
-                "{} changed between open and stat calls".format(source))
+                u"{} changed between open and stat calls".format(source))
         else:
             self._dependencies[src_path] = tuple(fd_stat)
 
@@ -1347,7 +1347,10 @@ class Collection(RichCollectionBase):
 
     def get_trash_at(self):
         if self._api_response and self._api_response["trash_at"]:
-            return ciso8601.parse_datetime(self._api_response["trash_at"])
+            try:
+                return ciso8601.parse_datetime(self._api_response["trash_at"])
+            except ValueError:
+                return None
         else:
             return None
 
index 811f096c60808b2033c8123d5330909ee88db816..e596e669156f9014ce5b9ad66d05fc2ba561bb3a 100644 (file)
@@ -10,17 +10,20 @@ import errno
 import json
 import os
 import re
-import subprocess32 as subprocess
 import sys
 import tarfile
 import tempfile
 import shutil
 import _strptime
 import fcntl
-
 from operator import itemgetter
 from stat import *
 
+if os.name == "posix" and sys.version_info[0] < 3:
+    import subprocess32 as subprocess
+else:
+    import subprocess
+
 import arvados
 import arvados.util
 import arvados.commands._util as arv_cmd
@@ -227,12 +230,15 @@ def docker_link_sort_key(link):
     Docker metadata links to sort them from least to most preferred.
     """
     try:
-        image_timestamp = ciso8601.parse_datetime_unaware(
+        image_timestamp = ciso8601.parse_datetime(
             link['properties']['image_timestamp'])
     except (KeyError, ValueError):
         image_timestamp = EARLIEST_DATETIME
-    return (image_timestamp,
-            ciso8601.parse_datetime_unaware(link['created_at']))
+    try:
+        created_timestamp = ciso8601.parse_datetime(link['created_at'])
+    except ValueError:
+        created_timestamp = None
+    return (image_timestamp, created_timestamp)
 
 def _get_docker_links(api_client, num_retries, **kwargs):
     links = arvados.util.list_all(api_client.links().list,
index 61258632bdd94f7acc684eaab50c442046f29f2e..54fa356d3a4537364ab770175addc742ade4826d 100644 (file)
@@ -360,7 +360,7 @@ class ResumeCache(object):
         try:
             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
         except IOError:
-            raise ResumeCacheConflict("{} locked".format(fileobj.name))
+            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
 
     def load(self):
         self.cache_file.seek(0)
@@ -502,7 +502,7 @@ class ArvPutUploadJob(object):
                     raise ArvPutUploadIsPending()
                 self._write_stdin(self.filename or 'stdin')
             elif not os.path.exists(path):
-                 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
+                 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
             elif os.path.isdir(path):
                 # Use absolute paths on cache index so CWD doesn't interfere
                 # with the caching logic.
@@ -742,7 +742,7 @@ class ArvPutUploadJob(object):
             elif file_in_local_collection.permission_expired():
                 # Permission token expired, re-upload file. This will change whenever
                 # we have a API for refreshing tokens.
-                self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
+                self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
                 should_upload = True
                 self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
@@ -757,7 +757,7 @@ class ArvPutUploadJob(object):
                 # Inconsistent cache, re-upload the file
                 should_upload = True
                 self._local_collection.remove(filename)
-                self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+                self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
         # Local file differs from cached data, re-upload it.
         else:
             if file_in_local_collection:
@@ -834,11 +834,11 @@ class ArvPutUploadJob(object):
         if self.use_cache:
             cache_filepath = self._get_cache_filepath()
             if self.resume and os.path.exists(cache_filepath):
-                self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
+                self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'a+')
             else:
                 # --no-resume means start with a empty cache file.
-                self.logger.info("Creating new cache file at {}".format(cache_filepath))
+                self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'w+')
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
@@ -924,7 +924,7 @@ class ArvPutUploadJob(object):
         try:
             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
         except IOError:
-            raise ResumeCacheConflict("{} locked".format(fileobj.name))
+            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
 
     def _save_state(self):
         """
@@ -1234,9 +1234,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     else:
         try:
             if args.update_collection:
-                logger.info("Collection updated: '{}'".format(writer.collection_name()))
+                logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
             else:
-                logger.info("Collection saved as '{}'".format(writer.collection_name()))
+                logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
index 5c781b158350fa6d7d869b061b24d0cfd95dcae1..ffca23495c475828184b31e355cfdaa42b6e0bdc 100644 (file)
@@ -46,7 +46,7 @@ setup(name='arvados-python-client',
           ('share/doc/arvados-python-client', ['LICENSE-2.0.txt', 'README.rst']),
       ],
       install_requires=[
-          'ciso8601 >=1.0.6, <2.0.0',
+          'ciso8601 >=2.0.0',
           'future',
           'google-api-python-client >=1.6.2, <1.7',
           'httplib2 >=0.9.2',
@@ -54,8 +54,11 @@ setup(name='arvados-python-client',
           'ruamel.yaml >=0.15.54, <=0.15.77',
           'setuptools',
           'ws4py >=0.4.2',
-          'subprocess32 >=3.5.1',
       ],
+      extras_require={
+          ':os.name=="posix" and python_version<"3"': ['subprocess32 >= 3.5.1'],
+          ':python_version<"3"': ['pytz'],
+      },
       classifiers=[
           'Programming Language :: Python :: 2',
           'Programming Language :: Python :: 3',
index a41184d10fb4fe7daadeb0892cf60ce36f47e8df..01a52a5e6681ec07daaf16eb0c0c18a9b7ba2ada 100644 (file)
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: Apache-2.0
@@ -1284,6 +1286,16 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
 
+    def test_unicode_on_filename(self):
+        tmpdir = self.make_tmpdir()
+        fname = u"i❤arvados.txt"
+        with open(os.path.join(tmpdir, fname), 'w') as f:
+            f.write("This is a unicode named file")
+        col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+        self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
+
     def test_silent_mode_no_errors(self):
         self.authorize_with('active')
         tmpdir = self.make_tmpdir()
index 6cc3072b77e175c5ddc91cd930fccb6ed213b975..f236ce83a30a47e51c8e8499abb7b5aecfe29c56 100644 (file)
@@ -369,7 +369,11 @@ module Arv
       end
 
       def add_copy(src_item, key)
-        self[key] = src_item.copy_named("#{path}/#{key}")
+        if key == "."
+          self[key] = src_item.copy_named("#{path}")
+        else
+          self[key] = src_item.copy_named("#{path}/#{key}")
+        end
       end
 
       def merge(src_item, key)
@@ -457,6 +461,10 @@ module Arv
         items["."] = CollectionStream.new(".")
       end
 
+      def add_copy(src_item, key)
+        items["."].add_copy(src_item, key)
+      end
+
       def raise_root_write_error(key)
         raise ArgumentError.new("can't write to %p at collection root" % key)
       end
index f34e58a6b5a48dcb18b18a89f4affa481ecbe3db..288fd263fa8bdbe69cff943446dd2c0f3db9bc04 100644 (file)
@@ -385,6 +385,16 @@ class CollectionTest < Minitest::Test
                  dst_coll.manifest_text)
   end
 
+  def test_copy_root_into_empty_collection
+    block = random_block(8)
+    src_coll = Arv::Collection.new(". #{block} 0:8:f1\n")
+    dst_coll = Arv::Collection.new()
+    dst_coll.cp_r("./", ".", src_coll)
+    assert_equal(". %s 0:8:f1\n" %
+                 [block],
+                 dst_coll.manifest_text)
+  end
+
   def test_copy_empty_source_path_raises_ArgumentError(src="", dst="./s1")
     coll = Arv::Collection.new(SIMPLEST_MANIFEST)
     assert_raises(ArgumentError) do
index da4c39d9014431fe750a78fa3c1bdb07232a0bff..2cda8bcb1441967135b1feb5c2adf344e970da81 100644 (file)
@@ -30,3 +30,6 @@
 
 # Generated git-commit.version file
 /git-commit.version
+
+# Generated when building distribution packages
+/package-build.version
index 3d1e6491150624bcbcbad9aa95a5e4dc202dd390..abcfdbd296b3ab71cf7e8466e7c9279076f2c93f 100644 (file)
@@ -382,6 +382,9 @@ class Container < ArvadosModel
     else
       kwargs = {}
     end
+    if users_list.select { |u| u.is_admin }.any?
+      return super
+    end
     Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").exists)
   end
 
index 5750d5ebbd0394d99bdce8ae2a038bb1d799cd77..178135ead87098b23874b3eeb607437458ee2eb0 100644 (file)
@@ -723,6 +723,14 @@ class ContainerTest < ActiveSupport::TestCase
     assert_equal 1, Container.readable_by(users(:active)).where(state: "Queued").count
   end
 
+  test "Containers with no matching request are readable by admin" do
+    uuids = Container.includes('container_requests').where(container_requests: {uuid: nil}).collect(&:uuid)
+    assert_not_empty uuids
+    assert_empty Container.readable_by(users(:active)).where(uuid: uuids)
+    assert_not_empty Container.readable_by(users(:admin)).where(uuid: uuids)
+    assert_equal uuids.count, Container.readable_by(users(:admin)).where(uuid: uuids).count
+  end
+
   test "Container locked cancel" do
     set_user_from_auth :active
     c, _ = minimal_new
index a50853837085f6b7a6fd89bb61eba381dc9f6098..b3c530e69013e91dfb7599c677347ecef3856d78 100644 (file)
@@ -8,7 +8,6 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "io/ioutil"
        "os"
        "os/exec"
        "path/filepath"
@@ -25,16 +24,17 @@ var (
 
 // procinfo is saved in each process's lockfile.
 type procinfo struct {
-       UUID   string
-       PID    int
-       Stdout string
-       Stderr string
+       UUID string
+       PID  int
 }
 
 // Detach acquires a lock for the given uuid, and starts the current
 // program as a child process (with -no-detach prepended to the given
 // arguments so the child knows not to detach again). The lock is
 // passed along to the child process.
+//
+// Stdout and stderr in the child process are sent to the systemd
+// journal using the systemd-cat program.
 func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
        return exitcode(stderr, detach(uuid, args, stdout, stderr))
 }
@@ -49,14 +49,15 @@ func detach(uuid string, args []string, stdout, stderr io.Writer) error {
                        return nil, err
                }
                defer dirlock.Close()
-               lockfile, err := os.OpenFile(filepath.Join(lockdir, lockprefix+uuid+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+               lockfilename := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
+               lockfile, err := os.OpenFile(lockfilename, os.O_CREATE|os.O_RDWR, 0700)
                if err != nil {
-                       return nil, err
+                       return nil, fmt.Errorf("open %s: %s", lockfilename, err)
                }
                err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
                if err != nil {
                        lockfile.Close()
-                       return nil, err
+                       return nil, fmt.Errorf("lock %s: %s", lockfilename, err)
                }
                return lockfile, nil
        }()
@@ -66,21 +67,7 @@ func detach(uuid string, args []string, stdout, stderr io.Writer) error {
        defer lockfile.Close()
        lockfile.Truncate(0)
 
-       outfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stdout-")
-       if err != nil {
-               return err
-       }
-       defer outfile.Close()
-       errfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stderr-")
-       if err != nil {
-               os.Remove(outfile.Name())
-               return err
-       }
-       defer errfile.Close()
-
-       cmd := exec.Command(args[0], append([]string{"-no-detach"}, args[1:]...)...)
-       cmd.Stdout = outfile
-       cmd.Stderr = errfile
+       cmd := exec.Command("systemd-cat", append([]string{"--identifier=crunch-run", args[0], "-no-detach"}, args[1:]...)...)
        // Child inherits lockfile.
        cmd.ExtraFiles = []*os.File{lockfile}
        // Ensure child isn't interrupted even if we receive signals
@@ -89,24 +76,14 @@ func detach(uuid string, args []string, stdout, stderr io.Writer) error {
        cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
        err = cmd.Start()
        if err != nil {
-               os.Remove(outfile.Name())
-               os.Remove(errfile.Name())
-               return err
+               return fmt.Errorf("exec %s: %s", cmd.Path, err)
        }
 
        w := io.MultiWriter(stdout, lockfile)
-       err = json.NewEncoder(w).Encode(procinfo{
-               UUID:   uuid,
-               PID:    cmd.Process.Pid,
-               Stdout: outfile.Name(),
-               Stderr: errfile.Name(),
+       return json.NewEncoder(w).Encode(procinfo{
+               UUID: uuid,
+               PID:  cmd.Process.Pid,
        })
-       if err != nil {
-               os.Remove(outfile.Name())
-               os.Remove(errfile.Name())
-               return err
-       }
-       return nil
 }
 
 // KillProcess finds the crunch-run process corresponding to the given
@@ -123,14 +100,14 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
        if os.IsNotExist(err) {
                return nil
        } else if err != nil {
-               return err
+               return fmt.Errorf("open %s: %s", path, err)
        }
        defer f.Close()
 
        var pi procinfo
        err = json.NewDecoder(f).Decode(&pi)
        if err != nil {
-               return fmt.Errorf("%s: %s\n", path, err)
+               return fmt.Errorf("decode %s: %s\n", path, err)
        }
 
        if pi.UUID != uuid || pi.PID == 0 {
@@ -139,7 +116,7 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
 
        proc, err := os.FindProcess(pi.PID)
        if err != nil {
-               return err
+               return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
        }
 
        err = proc.Signal(signal)
@@ -147,16 +124,19 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
                err = proc.Signal(syscall.Signal(0))
        }
        if err == nil {
-               return fmt.Errorf("pid %d: sent signal %d (%s) but process is still alive", pi.PID, signal, signal)
+               return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
        }
-       fmt.Fprintf(stderr, "pid %d: %s\n", pi.PID, err)
+       fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
        return nil
 }
 
 // List UUIDs of active crunch-run processes.
 func ListProcesses(stdout, stderr io.Writer) int {
-       return exitcode(stderr, filepath.Walk(lockdir, func(path string, info os.FileInfo, err error) error {
-               if info.IsDir() {
+       // filepath.Walk does not follow symlinks, so we must walk
+       // lockdir+"/." in case lockdir itself is a symlink.
+       walkdir := lockdir + "/."
+       return exitcode(stderr, filepath.Walk(walkdir, func(path string, info os.FileInfo, err error) error {
+               if info.IsDir() && path != walkdir {
                        return filepath.SkipDir
                }
                if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
@@ -186,7 +166,7 @@ func ListProcesses(stdout, stderr io.Writer) int {
                        err := os.Remove(path)
                        dirlock.Close()
                        if err != nil {
-                               fmt.Fprintln(stderr, err)
+                               fmt.Fprintf(stderr, "unlink %s: %s\n", f.Name(), err)
                        }
                        return nil
                }
@@ -224,14 +204,15 @@ func exitcode(stderr io.Writer, err error) int {
 //
 // Caller releases the lock by closing the returned file.
 func lockall() (*os.File, error) {
-       f, err := os.OpenFile(filepath.Join(lockdir, lockprefix+"all"+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+       lockfile := filepath.Join(lockdir, lockprefix+"all"+locksuffix)
+       f, err := os.OpenFile(lockfile, os.O_CREATE|os.O_RDWR, 0700)
        if err != nil {
-               return nil, err
+               return nil, fmt.Errorf("open %s: %s", lockfile, err)
        }
        err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
        if err != nil {
                f.Close()
-               return nil, err
+               return nil, fmt.Errorf("lock %s: %s", lockfile, err)
        }
        return f, nil
 }
index 2b9a119581dfd7c4f3245b1e57317ae95155f5b9..0576337aa13c280841187db3a7aea2dcf4af65c0 100644 (file)
@@ -1737,6 +1737,7 @@ func main() {
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
        detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+       stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
        sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1766,6 +1767,13 @@ func main() {
 
        flag.Parse()
 
+       if *stdinEnv && !ignoreDetachFlag {
+               // Load env vars on stdin if asked (but not in a
+               // detached child process, in which case stdin is
+               // /dev/null).
+               loadEnv(os.Stdin)
+       }
+
        switch {
        case *detach && !ignoreDetachFlag:
                os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
@@ -1856,3 +1864,21 @@ func main() {
                log.Fatalf("%s: %v", containerId, runerr)
        }
 }
+
+func loadEnv(rdr io.Reader) {
+       buf, err := ioutil.ReadAll(rdr)
+       if err != nil {
+               log.Fatalf("read stdin: %s", err)
+       }
+       var env map[string]string
+       err = json.Unmarshal(buf, &env)
+       if err != nil {
+               log.Fatalf("decode stdin: %s", err)
+       }
+       for k, v := range env {
+               err = os.Setenv(k, v)
+               if err != nil {
+                       log.Fatalf("setenv(%q): %s", k, err)
+               }
+       }
+}
index 2a3a19c54c66005a6f96cd8d1dbd6de3c6345aad..acebe2b1b7eaaffdc642a93849be9d3ab03b2e02 100644 (file)
@@ -12,7 +12,7 @@ def convertTime(t):
     if not t:
         return 0
     try:
-        return calendar.timegm(ciso8601.parse_datetime_unaware(t).timetuple())
+        return calendar.timegm(ciso8601.parse_datetime(t).timetuple())
     except (TypeError, ValueError):
         return 0
 
index 5a1aa809146db0f4b5a89e32390877963302e9e6..9b4b997cdc68dd14353e4621a77da7f30f7146a1 100644 (file)
@@ -42,9 +42,12 @@ setup(name='arvados_fuse',
         # llfuse 1.3.4 fails to install via pip
         'llfuse >=1.2, <1.3.4',
         'python-daemon',
-        'ciso8601 >=1.0.6, <2.0.0',
+        'ciso8601 >= 2.0.0',
         'setuptools'
         ],
+      extras_require={
+          ':python_version<"3"': ['pytz'],
+      },
       test_suite='tests',
       tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
       zip_safe=False
index 9b957815c84bfe7d3c41425646cf147fa38c4098..66956b89ee83928261bc67dcedba075c20b78397 100644 (file)
@@ -610,6 +610,9 @@ func (v *AzureBlobVolume) translateError(err error) error {
        switch {
        case err == nil:
                return err
+       case strings.Contains(err.Error(), "StatusCode=503"):
+               // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
+               return VolumeBusyError
        case strings.Contains(err.Error(), "Not Found"):
                // "storage: service returned without a response body (404 Not Found)"
                return os.ErrNotExist
index cbfc0bcdab992cf4d729cc7c1a1d2f03effe538c..ad907ef10138f213e3831223d867fd3c114736d9 100644 (file)
@@ -50,6 +50,7 @@ type RequestTester struct {
 //   - permissions on, authenticated request, unsigned locator
 //   - permissions on, unauthenticated request, signed locator
 //   - permissions on, authenticated request, expired locator
+//   - permissions on, authenticated request, signed locator, transient error from backend
 //
 func TestGetHandler(t *testing.T) {
        defer teardown()
@@ -152,6 +153,23 @@ func TestGetHandler(t *testing.T) {
        ExpectStatusCode(t,
                "Authenticated request, expired locator",
                ExpiredError.HTTPCode, response)
+
+       // Authenticated request, signed locator
+       // => 503 Server busy (transient error)
+
+       // Set up the block owning volume to respond with errors
+       vols[0].(*MockVolume).Bad = true
+       vols[0].(*MockVolume).BadVolumeError = VolumeBusyError
+       response = IssueRequest(&RequestTester{
+               method:   "GET",
+               uri:      signedLocator,
+               apiToken: knownToken,
+       })
+       // A transient error from one volume while the other doesn't find the block
+       // should make the service return a 503 so that clients can retry.
+       ExpectStatusCode(t,
+               "Volume backend busy",
+               503, response)
 }
 
 // Test PutBlockHandler on the following situations:
index 7da9f69adbae4c5fa2f21fe24128d66355bd7838..51dd73a513c1d4c729a6743aaabe0cefa1202c4b 100644 (file)
@@ -675,6 +675,11 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
                        if !os.IsNotExist(err) {
                                log.Printf("%s: Get(%s): %s", vol, hash, err)
                        }
+                       // If some volume returns a transient error, return it to the caller
+                       // instead of "Not found" so it can retry.
+                       if err == VolumeBusyError {
+                               errorToCaller = err.(*KeepError)
+                       }
                        continue
                }
                // Check the file checksum.
index fb1e1ea54516ef9375a0ea8aad91938ea84f3e7f..fcbdddacb1d585e995c8f23a0528be2ce8c1723c 100644 (file)
@@ -51,6 +51,7 @@ var (
        DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
        ExpiredError        = &KeepError{401, "Expired permission signature"}
        NotFoundError       = &KeepError{404, "Not Found"}
+       VolumeBusyError     = &KeepError{503, "Volume backend busy"}
        GenericError        = &KeepError{500, "Fail"}
        FullError           = &KeepError{503, "Full"}
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
index 26d49946a4555647f88e41658d2f7a2f949c830b..d1d380466ba5983d4a7752c95ff47cf3e9312a75 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "bytes"
        "context"
+       "errors"
        "fmt"
        "io/ioutil"
        "os"
@@ -165,6 +166,7 @@ func TestPutBlockOneVol(t *testing.T) {
 
        vols := KeepVM.AllWritable()
        vols[0].(*MockVolume).Bad = true
+       vols[0].(*MockVolume).BadVolumeError = errors.New("Bad volume")
 
        // Check that PutBlock stores the data as expected.
        if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
index fd1a56c5eb5cd6412e6703141306c29693738e93..df6a09e3ab56fbd80f6776c20cdb881e83df9233 100644 (file)
@@ -42,7 +42,8 @@ type MockVolume struct {
        Timestamps map[string]time.Time
 
        // Bad volumes return an error for every operation.
-       Bad bool
+       Bad            bool
+       BadVolumeError error
 
        // Touchable volumes' Touch() method succeeds for a locator
        // that has been Put().
@@ -106,7 +107,7 @@ func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error
        v.gotCall("Compare")
        <-v.Gate
        if v.Bad {
-               return errors.New("Bad volume")
+               return v.BadVolumeError
        } else if block, ok := v.Store[loc]; ok {
                if fmt.Sprintf("%x", md5.Sum(block)) != loc {
                        return DiskHashError
@@ -124,7 +125,7 @@ func (v *MockVolume) Get(ctx context.Context, loc string, buf []byte) (int, erro
        v.gotCall("Get")
        <-v.Gate
        if v.Bad {
-               return 0, errors.New("Bad volume")
+               return 0, v.BadVolumeError
        } else if block, ok := v.Store[loc]; ok {
                copy(buf[:len(block)], block)
                return len(block), nil
@@ -136,7 +137,7 @@ func (v *MockVolume) Put(ctx context.Context, loc string, block []byte) error {
        v.gotCall("Put")
        <-v.Gate
        if v.Bad {
-               return errors.New("Bad volume")
+               return v.BadVolumeError
        }
        if v.Readonly {
                return MethodDisabledError
@@ -164,7 +165,7 @@ func (v *MockVolume) Mtime(loc string) (time.Time, error) {
        var mtime time.Time
        var err error
        if v.Bad {
-               err = errors.New("Bad volume")
+               err = v.BadVolumeError
        } else if t, ok := v.Timestamps[loc]; ok {
                mtime = t
        } else {
index 7bd1498158304ea3ab8a969c0c90129241ab1028..63bdb49e5b6ad8675b26077deba8e29ead2e5b0f 100644 (file)
@@ -44,7 +44,7 @@ type v0session struct {
        permChecker   permChecker
        subscriptions []v0subscribe
        lastMsgID     uint64
-       log           *logrus.Entry
+       log           logrus.FieldLogger
        mtx           sync.Mutex
        setupOnce     sync.Once
 }
index 68c87233f0001b25a05e38917a3b1356fa49822c..6f13ee0278f8c67c333b03f338c998c741a8d9a8 100755 (executable)
@@ -44,6 +44,7 @@ $RAILS_ENV:
   arvados_docsite: http://$localip:${services[doc]}/
   force_ssl: false
   composer_url: http://$localip:${services[composer]}
+  workbench2_url: https://$localip:${services[workbench2-ssl]}
 EOF
 
 bundle exec rake assets:precompile