# npm-rails
/node_modules
/npm-debug.log
+
+# Generated when building distribution packages
+/package-build.version
</form>
</li>
<% end %>
+ <% if Rails.configuration.workbench2_url %>
+ <li role="menuitem">
+ <%
+ wb2_url = Rails.configuration.workbench2_url
+ wb2_url += '/' if wb2_url[-1] != '/'
+ wb2_url += 'token'
+ %>
+ <form action="<%= wb2_url %>" method="GET">
+ <input type="hidden" name="api_token" value="<%= Thread.current[:arvados_api_token] %>">
+ <button role="menuitem" type="submit">
+ <i class="fa fa-lg fa-share-square fa-fw"></i> Go to Workbench 2
+ </button>
+ </form>
+ </li>
+ <% end %>
<li role="menuitem">
<%= link_to virtual_machines_user_path(current_user), role: 'menu-item' do %>
<i class="fa fa-lg fa-terminal fa-fw"></i> Virtual machines
# the jobs api is disabled and there are no local git repositories.
#
repositories: true
+
+ #
+ # Add an item to the user menu pointing to workbench2_url, if not false.
+ #
+ # Example:
+ # workbench2_url: https://workbench2.qr1hi.arvadosapi.com
+ #
+ workbench2_url: false
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+include ConfigValidators
+
+ConfigValidators::validate_wb2_url_config()
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require 'uri'
+
+module ConfigValidators
+ def validate_wb2_url_config
+ if Rails.configuration.workbench2_url
+ begin
+ if !URI.parse(Rails.configuration.workbench2_url).is_a?(URI::HTTP)
+ Rails.logger.warn("workbench2_url config is not an HTTP URL: #{Rails.configuration.workbench2_url}")
+ Rails.configuration.workbench2_url = false
+ elsif /.*[\/]{2,}$/.match(Rails.configuration.workbench2_url)
+ Rails.logger.warn("workbench2_url config shouldn't have multiple trailing slashes: #{Rails.configuration.workbench2_url}")
+ Rails.configuration.workbench2_url = false
+ else
+ return true
+ end
+ rescue URI::InvalidURIError
+ Rails.logger.warn("workbench2_url config invalid URL: #{Rails.configuration.workbench2_url}")
+ Rails.configuration.workbench2_url = false
+ end
+ end
+ return false
+ end
+end
+
+++ /dev/null
-1.2.1.20181126194329
end
end
+ [
+ [false, false],
+ ['http://wb2.example.org//', false],
+ ['ftp://wb2.example.org', false],
+ ['wb2.example.org', false],
+ ['http://wb2.example.org', true],
+ ['https://wb2.example.org', true],
+ ['http://wb2.example.org/', true],
+ ['https://wb2.example.org/', true],
+ ].each do |wb2_url_config, wb2_menu_appear|
+ test "workbench2_url=#{wb2_url_config} should#{wb2_menu_appear ? '' : ' not'} show WB2 menu" do
+ Rails.configuration.workbench2_url = wb2_url_config
+ assert_equal wb2_menu_appear, ConfigValidators::validate_wb2_url_config()
+
+ visit page_with_token('active')
+ within('.navbar-fixed-top') do
+ page.find("#notifications-menu").click
+ within('.dropdown-menu') do
+ assert_equal wb2_menu_appear, page.has_text?('Go to Workbench 2')
+ end
+ end
+ end
+ end
+
[
['active', true],
['active_with_prefs_profile_no_getting_started_shown', false],
+++ /dev/null
-+ echo -n 'geckodriver: '
-+ which geckodriver || fatal "No geckodriver. Unable to find Mozilla geckodriver. Please download the server from https://github.com/mozilla/geckodriver/releases and place it somewhere on your PATH. More info at https://developer.mozilla.org/en-US/docs/Mozilla/QA/Marionette/WebDriver."
-
RUN touch /var/lib/rpm/* && yum -q -y install rh-python35
RUN scl enable rh-python35 "easy_install-3.5 pip" && easy_install-2.7 pip
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
# Add epel, we need it for the python-pam dependency
RUN wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
RUN rpm -ivh epel-release-latest-7.noarch.rpm
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
+# The version of setuptools that comes with CentOS is way too old
+RUN pip install --upgrade setuptools
+
ENV WORKSPACE /arvados
CMD ["scl", "enable", "rh-python35", "/usr/local/rvm/bin/rvm-exec default bash /jenkins/run-build-packages.sh --target centos7"]
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
# Install virtualenv
RUN /usr/bin/pip install virtualenv
ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
ENV WORKSPACE /arvados
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
# Install virtualenv
RUN /usr/bin/pip install virtualenv
ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
ENV WORKSPACE /arvados
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip python3.4-venv python3.4-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip python3.4-venv python3.4-dev
# Install virtualenv
RUN /usr/bin/pip install virtualenv
ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
ENV WORKSPACE /arvados
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev libgnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip tzdata python3-venv python3-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev libgnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip tzdata python3-venv python3-dev
# Install virtualenv
RUN /usr/bin/pip install virtualenv
ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
ENV WORKSPACE /arvados
ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
ENV WORKSPACE /arvados
DASHQ_UNLESS_DEBUG=
fi
-EASY_INSTALL2=$(find_easy_install -$PYTHON2_VERSION "")
-EASY_INSTALL3=$(find_easy_install -$PYTHON3_VERSION 3)
-
RUN_BUILD_PACKAGES_PATH="`dirname \"$0\"`"
RUN_BUILD_PACKAGES_PATH="`( cd \"$RUN_BUILD_PACKAGES_PATH\" && pwd )`" # absolutized and normalized
if [ -z "$RUN_BUILD_PACKAGES_PATH" ] ; then
# Python packages
debug_echo -e "\nPython packages\n"
-cd "$WORKSPACE/sdk/pam"
-handle_python_package
-
-cd "$WORKSPACE/sdk/python"
-handle_python_package
-
-cd "$WORKSPACE/sdk/cwl"
-handle_python_package
-
-cd "$WORKSPACE/services/fuse"
-handle_python_package
-
-cd "$WORKSPACE/services/nodemanager"
-handle_python_package
-
# arvados-src
(
cd "$WORKSPACE"
"Arvados server daemons"
package_go_binary cmd/arvados-server arvados-controller \
"Arvados cluster controller daemon"
-package_go_binary cmd/arvados-server crunch-dispatch-cloud \
+package_go_binary cmd/arvados-server arvados-dispatch-cloud \
"Arvados cluster cloud dispatch"
package_go_binary sdk/go/crunchrunner crunchrunner \
"Crunchrunner executes a command inside a container and uploads the output"
echo ${repo_pkg_list} |grep -q ${complete_pkgname}
if [ $? -eq 0 ] ; then
echo "Package $complete_pkgname exists, not rebuilding!"
- curl -o ./${complete_pkgname} http://apt.arvados.org/pool/${D}/main/${repo_subdir}/${complete_pkgname}
+ curl -s -o ./${complete_pkgname} http://apt.arvados.org/pool/${D}/main/${repo_subdir}/${complete_pkgname}
return 1
elif test -f "$WORKSPACE/packages/$TARGET/processed/${complete_pkgname}" ; then
echo "Package $complete_pkgname exists, not rebuilding!"
else
centos_repo="http://rpm.arvados.org/CentOS/7/dev/x86_64/"
- repo_pkg_list=$(curl -o - ${centos_repo})
+ repo_pkg_list=$(curl -s -o - ${centos_repo})
echo ${repo_pkg_list} |grep -q ${complete_pkgname}
if [ $? -eq 0 ]; then
echo "Package $complete_pkgname exists, not rebuilding!"
- curl -o ./${complete_pkgname} ${centos_repo}${complete_pkgname}
+ curl -s -o ./${complete_pkgname} ${centos_repo}${complete_pkgname}
return 1
elif test -f "$WORKSPACE/packages/$TARGET/processed/${complete_pkgname}" ; then
echo "Package $complete_pkgname exists, not rebuilding!"
local python=""
case "$PACKAGE_TYPE" in
+ python3)
+ python=python3
+ if [[ "$FORMAT" != "rpm" ]]; then
+ pip=pip3
+ else
+ # In CentOS, we use a different mechanism to get the right version of pip
+ pip=pip
+ fi
+ PACKAGE_PREFIX=$PYTHON3_PKG_PREFIX
+ ;;
python)
# All Arvados Python2 packages depend on Python 2.7.
# Make sure we build with that for consistency.
python=python2.7
+ pip=pip
PACKAGE_PREFIX=$PYTHON2_PKG_PREFIX
;;
- python3)
- PACKAGE_PREFIX=$PYTHON3_PKG_PREFIX
- python=python3
- ;;
esac
if [[ "$PKG" != "libpam-arvados" ]] &&
rm -rf dist/*
+ # Get the latest setuptools
+ if ! $pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools; then
+ echo "Error, unable to upgrade setuptools with"
+ echo " $pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools"
+ exit 1
+ fi
if ! $python setup.py $DASHQ_UNLESS_DEBUG sdist; then
- echo "Error, unable to run python setup.py sdist for $PKG"
+ echo "Error, unable to run $python setup.py sdist for $PKG"
exit 1
fi
exit 1
fi
- if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip; then
+ if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip; then
echo "Error, unable to upgrade pip with"
- echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip"
+ echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip"
+ exit 1
+ fi
+ if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools; then
+ echo "Error, unable to upgrade setuptools with"
+ echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools"
exit 1
fi
- if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel; then
+ if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel; then
echo "Error, unable to upgrade wheel with"
- echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel"
+ echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel"
exit 1
fi
if [[ "$TARGET" != "centos7" ]] || [[ "$PYTHON_PKG" != "python-arvados-fuse" ]]; then
- build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
+ build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
else
# centos7 needs these special tweaks to install python-arvados-fuse
- build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG docutils
- PYCURL_SSL_LIBRARY=nss build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
+ build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG docutils
+ PYCURL_SSL_LIBRARY=nss build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
fi
if [[ "$?" != "0" ]]; then
echo "Error, unable to run"
- echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH"
+ echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH"
exit 1
fi
lib/controller
lib/crunchstat
lib/cloud
+lib/cloud/azure
lib/dispatchcloud
lib/dispatchcloud/container
lib/dispatchcloud/scheduler
(
set -e
mkdir -p "$GOPATH/src/git.curoverse.com"
- rmdir -v --parents --ignore-fail-on-non-empty "${temp}/GOPATH"
if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
for d in \
"$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
lib/controller
lib/crunchstat
lib/cloud
+ lib/cloud/azure
lib/dispatchcloud
lib/dispatchcloud/container
lib/dispatchcloud/scheduler
# SPDX-License-Identifier: AGPL-3.0
[Unit]
-Description=Arvados cloud dispatch
+Description=arvados-dispatch-cloud
Documentation=https://doc.arvados.org/
After=network.target
AssertPathExists=/etc/arvados/config.yml
[Service]
Type=notify
EnvironmentFile=-/etc/arvados/environment
-ExecStart=/usr/bin/crunch-dispatch-cloud
+ExecStart=/usr/bin/arvados-dispatch-cloud
Restart=always
RestartSec=1
In order to start workflows from workbench, there needs to be Docker image tagged @arvados/jobs:latest@. The following command downloads the latest arvados/jobs image from Docker Hub, loads it into Keep, and tags it as 'latest'. In this example @$project_uuid@ should be the the UUID of the "Arvados Standard Docker Images" project.
<notextile>
-<pre><code>~$ <span class="userinput">arv-keepdocker --project-uuid $project_uuid --pull arvados/jobs latest</span>
+<pre><code>~$ <span class="userinput">arv-keepdocker --pull arvados/jobs latest --project-uuid $project_uuid</span>
</code></pre></notextile>
If the image needs to be downloaded from Docker Hub, the command can take a few minutes to complete, depending on available network bandwidth.
|Debian 9 ("stretch")|Supported|Latest|
|Ubuntu 14.04 ("trusty")|Supported|Latest|
|Ubuntu 16.04 ("xenial")|Supported|Latest|
+|Ubuntu 18.04 ("bionic")|Supported|Latest|
|Ubuntu 12.04 ("precise")|EOL|8ed7b6dd5d4df93a3f37096afe6d6f81c2a7ef6e (2017-05-03)|
|Debian 7 ("wheezy")|EOL|997479d1408139e96ecdb42a60b4f727f814f6c9 (2016-12-28)|
|CentOS 6 |EOL|997479d1408139e96ecdb42a60b4f727f814f6c9 (2016-12-28)|
h3. Debian and Ubuntu
-Packages are available for Debian 8 ("jessie"), Debian 9 ("stretch"), Ubuntu 14.04 ("trusty") and Ubuntu 16.04 ("xenial").
+Packages are available for Debian 8 ("jessie"), Debian 9 ("stretch"), Ubuntu 14.04 ("trusty"), Ubuntu 16.04 ("xenial") and Ubuntu 18.04 ("bionic").
First, register the Curoverse signing key in apt's database:
|Debian 9 ("stretch")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ stretch main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
|Ubuntu 14.04 ("trusty")[1]|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ trusty main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
|Ubuntu 16.04 ("xenial")[1]|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ xenial main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
+|Ubuntu 18.04 ("bionic")[1]|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ bionic main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
{% include 'notebox_begin' %}
Docker images are subject to normal Arvados permissions. If wish to share your Docker image with others (or wish to share a pipeline template that uses your Docker image) you will need to use @arv-keepdocker@ with the @--project-uuid@ option to upload the image to a shared project.
<notextile>
-<pre><code>$ <span class="userinput">arv-keepdocker --project-uuid qr1hi-j7d0g-xxxxxxxxxxxxxxx arvados/jobs-with-r</span>
+<pre><code>$ <span class="userinput">arv-keepdocker arvados/jobs-with-r --project-uuid qr1hi-j7d0g-xxxxxxxxxxxxxxx</span>
</code></pre>
</notextile>
//
// SPDX-License-Identifier: AGPL-3.0
-package cloud
+package azure
import (
"context"
"encoding/base64"
+ "encoding/json"
"fmt"
"net/http"
"regexp"
"sync"
"time"
+ "git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/Azure/go-autorest/autorest/to"
"github.com/jmcvetta/randutil"
- "github.com/mitchellh/mapstructure"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
-type AzureInstanceSetConfig struct {
- SubscriptionID string `mapstructure:"subscription_id"`
- ClientID string `mapstructure:"key"`
- ClientSecret string `mapstructure:"secret"`
- TenantID string `mapstructure:"tenant_id"`
- CloudEnv string `mapstructure:"cloud_environment"`
- ResourceGroup string `mapstructure:"resource_group"`
- Location string `mapstructure:"region"`
- Network string `mapstructure:"network"`
- Subnet string `mapstructure:"subnet"`
- StorageAccount string `mapstructure:"storage_account"`
- BlobContainer string `mapstructure:"blob_container"`
- Image string `mapstructure:"image"`
- DeleteDanglingResourcesAfter float64 `mapstructure:"delete_dangling_resources_after"`
-}
-
-type VirtualMachinesClientWrapper interface {
- CreateOrUpdate(ctx context.Context,
+// Driver is the azure implementation of the cloud.Driver interface.
+var Driver = cloud.DriverFunc(newAzureInstanceSet)
+
+type azureInstanceSetConfig struct {
+ SubscriptionID string
+ ClientID string
+ ClientSecret string
+ TenantID string
+ CloudEnvironment string
+ ResourceGroup string
+ Location string
+ Network string
+ Subnet string
+ StorageAccount string
+ BlobContainer string
+ DeleteDanglingResourcesAfter arvados.Duration
+ AdminUsername string
+}
+
+const tagKeyInstanceSecret = "InstanceSecret"
+
+type virtualMachinesClientWrapper interface {
+ createOrUpdate(ctx context.Context,
resourceGroupName string,
VMName string,
parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
- Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
- ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
+ delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
+ listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
}
-type VirtualMachinesClientImpl struct {
+type virtualMachinesClientImpl struct {
inner compute.VirtualMachinesClient
}
-func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
+func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
resourceGroupName string,
VMName string,
parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
if err != nil {
- return compute.VirtualMachine{}, WrapAzureError(err)
+ return compute.VirtualMachine{}, wrapAzureError(err)
}
future.WaitForCompletionRef(ctx, cl.inner.Client)
r, err := future.Result(cl.inner)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
}
-func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
err = future.WaitForCompletionRef(ctx, cl.inner.Client)
- return future.Response(), WrapAzureError(err)
+ return future.Response(), wrapAzureError(err)
}
-func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
+func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
r, err := cl.inner.ListComplete(ctx, resourceGroupName)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
}
-type InterfacesClientWrapper interface {
- CreateOrUpdate(ctx context.Context,
+type interfacesClientWrapper interface {
+ createOrUpdate(ctx context.Context,
resourceGroupName string,
networkInterfaceName string,
parameters network.Interface) (result network.Interface, err error)
- Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
- ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
+ delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
+ listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
}
-type InterfacesClientImpl struct {
+type interfacesClientImpl struct {
inner network.InterfacesClient
}
-func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
err = future.WaitForCompletionRef(ctx, cl.inner.Client)
- return future.Response(), WrapAzureError(err)
+ return future.Response(), wrapAzureError(err)
}
-func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
+func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
resourceGroupName string,
networkInterfaceName string,
parameters network.Interface) (result network.Interface, err error) {
future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
if err != nil {
- return network.Interface{}, WrapAzureError(err)
+ return network.Interface{}, wrapAzureError(err)
}
future.WaitForCompletionRef(ctx, cl.inner.Client)
r, err := future.Result(cl.inner)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
}
-func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
+func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
r, err := cl.inner.ListComplete(ctx, resourceGroupName)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
}
var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
-type AzureRateLimitError struct {
+type azureRateLimitError struct {
azure.RequestError
- earliestRetry time.Time
+ firstRetry time.Time
}
-func (ar *AzureRateLimitError) EarliestRetry() time.Time {
- return ar.earliestRetry
+func (ar *azureRateLimitError) EarliestRetry() time.Time {
+ return ar.firstRetry
}
-type AzureQuotaError struct {
+type azureQuotaError struct {
azure.RequestError
}
-func (ar *AzureQuotaError) IsQuotaError() bool {
+func (ar *azureQuotaError) IsQuotaError() bool {
return true
}
-func WrapAzureError(err error) error {
+func wrapAzureError(err error) error {
de, ok := err.(autorest.DetailedError)
if !ok {
return err
earliestRetry = time.Now().Add(20 * time.Second)
}
}
- return &AzureRateLimitError{*rq, earliestRetry}
+ return &azureRateLimitError{*rq, earliestRetry}
}
if rq.ServiceError == nil {
return err
}
if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
- return &AzureQuotaError{*rq}
+ return &azureQuotaError{*rq}
}
return err
}
-type AzureInstanceSet struct {
- azconfig AzureInstanceSetConfig
- vmClient VirtualMachinesClientWrapper
- netClient InterfacesClientWrapper
+type azureInstanceSet struct {
+ azconfig azureInstanceSetConfig
+ vmClient virtualMachinesClientWrapper
+ netClient interfacesClientWrapper
storageAcctClient storageacct.AccountsClient
azureEnv azure.Environment
interfaces map[string]network.Interface
logger logrus.FieldLogger
}
-func NewAzureInstanceSet(config map[string]interface{}, dispatcherID InstanceSetID, logger logrus.FieldLogger) (prv InstanceSet, err error) {
- azcfg := AzureInstanceSetConfig{}
- if err = mapstructure.Decode(config, &azcfg); err != nil {
+func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+ azcfg := azureInstanceSetConfig{}
+ err = json.Unmarshal(config, &azcfg)
+ if err != nil {
return nil, err
}
- ap := AzureInstanceSet{logger: logger}
+
+ ap := azureInstanceSet{logger: logger}
err = ap.setup(azcfg, string(dispatcherID))
if err != nil {
return nil, err
return &ap, nil
}
-func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID string) (err error) {
+func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
az.azconfig = azcfg
vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
- az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnv)
+ az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
if err != nil {
return err
}
netClient.Authorizer = authorizer
storageAcctClient.Authorizer = authorizer
- az.vmClient = &VirtualMachinesClientImpl{vmClient}
- az.netClient = &InterfacesClientImpl{netClient}
+ az.vmClient = &virtualMachinesClientImpl{vmClient}
+ az.netClient = &interfacesClientImpl{netClient}
az.storageAcctClient = storageAcctClient
az.dispatcherID = dispatcherID
tk.Stop()
return
case <-tk.C:
- az.ManageBlobs()
+ az.manageBlobs()
}
}
}()
az.deleteNIC = make(chan string)
az.deleteBlob = make(chan storage.Blob)
- for i := 0; i < 4; i += 1 {
+ for i := 0; i < 4; i++ {
go func() {
for {
nicname, ok := <-az.deleteNIC
if !ok {
return
}
- _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
+ _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
if delerr != nil {
az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
} else {
return nil
}
-func (az *AzureInstanceSet) Create(
+func (az *azureInstanceSet) Create(
instanceType arvados.InstanceType,
- imageId ImageID,
- newTags InstanceTags,
- publicKey ssh.PublicKey) (Instance, error) {
+ imageID cloud.ImageID,
+ newTags cloud.InstanceTags,
+ initCommand cloud.InitCommand,
+ publicKey ssh.PublicKey) (cloud.Instance, error) {
az.stopWg.Add(1)
defer az.stopWg.Done()
- if len(newTags["node-token"]) == 0 {
- return nil, fmt.Errorf("Must provide tag 'node-token'")
- }
-
name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
if err != nil {
return nil, err
tags["dispatch-"+k] = &newstr
}
- tags["dispatch-instance-type"] = &instanceType.Name
-
nicParameters := network.Interface{
Location: &az.azconfig.Location,
Tags: tags,
},
},
}
- nic, err := az.netClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
+ nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
- instance_vhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
+ instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
az.azconfig.StorageAccount,
az.azureEnv.StorageEndpointSuffix,
az.azconfig.BlobContainer,
name)
- customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
-echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
+ customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
vmParameters := compute.VirtualMachine{
Location: &az.azconfig.Location,
Name: to.StringPtr(name + "-os"),
CreateOption: compute.FromImage,
Image: &compute.VirtualHardDisk{
- URI: to.StringPtr(string(imageId)),
+ URI: to.StringPtr(string(imageID)),
},
Vhd: &compute.VirtualHardDisk{
- URI: &instance_vhd,
+ URI: &instanceVhd,
},
},
},
},
OsProfile: &compute.OSProfile{
ComputerName: &name,
- AdminUsername: to.StringPtr("crunch"),
+ AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
LinuxConfiguration: &compute.LinuxConfiguration{
DisablePasswordAuthentication: to.BoolPtr(true),
SSH: &compute.SSHConfiguration{
PublicKeys: &[]compute.SSHPublicKey{
- compute.SSHPublicKey{
- Path: to.StringPtr("/home/crunch/.ssh/authorized_keys"),
+ {
+ Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
},
},
},
}
- vm, err := az.vmClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
+ vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
- return &AzureInstance{
+ return &azureInstance{
provider: az,
nic: nic,
vm: vm,
}, nil
}
-func (az *AzureInstanceSet) Instances(InstanceTags) ([]Instance, error) {
+func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
az.stopWg.Add(1)
defer az.stopWg.Done()
- interfaces, err := az.ManageNics()
+ interfaces, err := az.manageNics()
if err != nil {
return nil, err
}
- result, err := az.vmClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
+ result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
- instances := make([]Instance, 0)
+ instances := make([]cloud.Instance, 0)
for ; result.NotDone(); err = result.Next() {
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
- instances = append(instances, &AzureInstance{
+ instances = append(instances, &azureInstance{
provider: az,
vm: result.Value(),
nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
// not associated with a virtual machine and have a "create-at" time
// more than DeleteDanglingResourcesAfter (to prevent racing and
// deleting newly created NICs) in the past are deleted.
-func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
+func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
az.stopWg.Add(1)
defer az.stopWg.Done()
- result, err := az.netClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
+ result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
interfaces := make(map[string]network.Interface)
interfaces[*result.Value().ID] = result.Value()
} else {
if result.Value().Tags["created-at"] != nil {
- created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
+ createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
if err == nil {
- if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
- az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
+ if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
+ az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
az.deleteNIC <- *result.Value().Name
}
}
// have "namePrefix", are "available" (which means they are not
// leased to a VM) and haven't been modified for
// DeleteDanglingResourcesAfter seconds.
-func (az *AzureInstanceSet) ManageBlobs() {
+func (az *azureInstanceSet) manageBlobs() {
result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
if err != nil {
az.logger.WithError(err).Warn("Couldn't get account keys")
if b.Properties.BlobType == storage.BlobTypePage &&
b.Properties.LeaseState == "available" &&
b.Properties.LeaseStatus == "unlocked" &&
- age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
+ age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
az.deleteBlob <- b
}
}
-func (az *AzureInstanceSet) Stop() {
+func (az *azureInstanceSet) Stop() {
az.stopFunc()
az.stopWg.Wait()
close(az.deleteNIC)
close(az.deleteBlob)
}
-type AzureInstance struct {
- provider *AzureInstanceSet
+type azureInstance struct {
+ provider *azureInstanceSet
nic network.Interface
vm compute.VirtualMachine
}
-func (ai *AzureInstance) ID() InstanceID {
- return InstanceID(*ai.vm.ID)
+func (ai *azureInstance) ID() cloud.InstanceID {
+ return cloud.InstanceID(*ai.vm.ID)
}
-func (ai *AzureInstance) String() string {
+func (ai *azureInstance) String() string {
return *ai.vm.Name
}
-func (ai *AzureInstance) ProviderType() string {
+func (ai *azureInstance) ProviderType() string {
return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
}
-func (ai *AzureInstance) SetTags(newTags InstanceTags) error {
+func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
ai.provider.stopWg.Add(1)
defer ai.provider.stopWg.Done()
Location: &ai.provider.azconfig.Location,
Tags: tags,
}
- vm, err := ai.provider.vmClient.CreateOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
+ vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
if err != nil {
- return WrapAzureError(err)
+ return wrapAzureError(err)
}
ai.vm = vm
return nil
}
-func (ai *AzureInstance) Tags() InstanceTags {
+func (ai *azureInstance) Tags() cloud.InstanceTags {
tags := make(map[string]string)
for k, v := range ai.vm.Tags {
return tags
}
-func (ai *AzureInstance) Destroy() error {
+func (ai *azureInstance) Destroy() error {
ai.provider.stopWg.Add(1)
defer ai.provider.stopWg.Done()
- _, err := ai.provider.vmClient.Delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
- return WrapAzureError(err)
+ _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
+ return wrapAzureError(err)
}
-func (ai *AzureInstance) Address() string {
+func (ai *azureInstance) Address() string {
return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
}
-func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
- ai.provider.stopWg.Add(1)
- defer ai.provider.stopWg.Done()
-
- remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
-
- tags := ai.Tags()
-
- tg := tags["ssh-pubkey-fingerprint"]
- if tg != "" {
- if remoteFingerprint == tg {
- return nil
- } else {
- return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
- }
- }
-
- nodetokenTag := tags["node-token"]
- if nodetokenTag == "" {
- return fmt.Errorf("Missing node token tag")
- }
-
- sess, err := client.NewSession()
- if err != nil {
- return err
- }
-
- nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
- if err != nil {
- return err
- }
-
- nodetoken := strings.TrimSpace(string(nodetokenbytes))
-
- expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
-
- if strings.TrimSpace(nodetoken) != expectedToken {
- return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
- }
-
- sess, err = client.NewSession()
- if err != nil {
- return err
- }
-
- keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
- if err != nil {
- return err
- }
-
- sp := strings.Split(string(keyfingerprintbytes), " ")
-
- if remoteFingerprint != sp[1] {
- return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
- }
+func (ai *azureInstance) RemoteUser() string {
+ return ai.provider.azconfig.AdminUsername
+}
- tags["ssh-pubkey-fingerprint"] = sp[1]
- delete(tags, "node-token")
- ai.SetTags(tags)
- return nil
+func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+ return cloud.ErrNotImplemented
}
// SPDX-License-Identifier: AGPL-3.0
//
//
-// How to manually run individual tests against the real cloud
+// How to manually run individual tests against the real cloud:
//
-// $ go test -v git.curoverse.com/arvados.git/lib/cloud -live-azure-cfg azconfig.yml -check.f=TestListInstances
+// $ go test -v git.curoverse.com/arvados.git/lib/cloud/azure -live-azure-cfg azconfig.yml -check.f=TestCreate
+//
+// Tests should be run individually and in the order they are listed in the file:
//
// Example azconfig.yml:
//
-// subscription_id: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
-// key: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
-// region: centralus
-// cloud_environment: AzurePublicCloud
-// secret: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
-// tenant_id: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
-// resource_group: zzzzz
-// network: zzzzz
-// subnet: zzzzz-subnet-private
-// storage_account: example
-// blob_container: vhds
-// image: "https://example.blob.core.windows.net/system/Microsoft.Compute/Images/images/zzzzz-compute-osDisk.XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.vhd"
-// delete_dangling_resources_after: 20
-// authorized_key: "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDLQS1ExT2+WjA0d/hntEAyAtgeN1W2ik2QX8c2zO6HjlPHWXL92r07W0WMuDib40Pcevpi1BXeBWXA9ZB5KKMJB+ukaAu22KklnQuUmNvk6ZXnPKSkGxuCYvPQb08WhHf3p1VxiKfP3iauedBDM4x9/bkJohlBBQiFXzNUcQ+a6rKiMzmJN2gbL8ncyUzc+XQ5q4JndTwTGtOlzDiGOc9O4z5Dd76wtAVJneOuuNpwfFRVHThpJM6VThpCZOnl8APaceWXKeuwOuCae3COZMz++xQfxOfZ9Z8aIwo+TlQhsRaNfZ4Vjrop6ej8dtfZtgUFKfbXEOYaHrGrWGotFDTD example@example"
-
-package cloud
+// ImageIDForTestSuite: "https://example.blob.core.windows.net/system/Microsoft.Compute/Images/images/zzzzz-compute-osDisk.XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.vhd"
+// DriverParameters:
+// SubscriptionID: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
+// ClientID: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
+// Location: centralus
+// CloudEnvironment: AzurePublicCloud
+// ClientSecret: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
+// TenantId: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
+// ResourceGroup: zzzzz
+// Network: zzzzz
+// Subnet: zzzzz-subnet-private
+// StorageAccount: example
+// BlobContainer: vhds
+// DeleteDanglingResourcesAfter: 20s
+// AdminUsername: crunch
+
+package azure
import (
"context"
+ "encoding/json"
"errors"
"flag"
"io/ioutil"
"net"
"net/http"
"os"
+ "testing"
"time"
+ "git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/config"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/to"
- "github.com/jmcvetta/randutil"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
)
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
type AzureInstanceSetSuite struct{}
var _ = check.Suite(&AzureInstanceSetSuite{})
type VirtualMachinesClientStub struct{}
-var testKey []byte = []byte(`ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDLQS1ExT2+WjA0d/hntEAyAtgeN1W2ik2QX8c2zO6HjlPHWXL92r07W0WMuDib40Pcevpi1BXeBWXA9ZB5KKMJB+ukaAu22KklnQuUmNvk6ZXnPKSkGxuCYvPQb08WhHf3p1VxiKfP3iauedBDM4x9/bkJohlBBQiFXzNUcQ+a6rKiMzmJN2gbL8ncyUzc+XQ5q4JndTwTGtOlzDiGOc9O4z5Dd76wtAVJneOuuNpwfFRVHThpJM6VThpCZOnl8APaceWXKeuwOuCae3COZMz++xQfxOfZ9Z8aIwo+TlQhsRaNfZ4Vjrop6ej8dtfZtgUFKfbXEOYaHrGrWGotFDTD example@example`)
+var testKey = []byte(`ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDLQS1ExT2+WjA0d/hntEAyAtgeN1W2ik2QX8c2zO6HjlPHWXL92r07W0WMuDib40Pcevpi1BXeBWXA9ZB5KKMJB+ukaAu22KklnQuUmNvk6ZXnPKSkGxuCYvPQb08WhHf3p1VxiKfP3iauedBDM4x9/bkJohlBBQiFXzNUcQ+a6rKiMzmJN2gbL8ncyUzc+XQ5q4JndTwTGtOlzDiGOc9O4z5Dd76wtAVJneOuuNpwfFRVHThpJM6VThpCZOnl8APaceWXKeuwOuCae3COZMz++xQfxOfZ9Z8aIwo+TlQhsRaNfZ4Vjrop6ej8dtfZtgUFKfbXEOYaHrGrWGotFDTD example@example`)
-func (*VirtualMachinesClientStub) CreateOrUpdate(ctx context.Context,
+func (*VirtualMachinesClientStub) createOrUpdate(ctx context.Context,
resourceGroupName string,
VMName string,
parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
return parameters, nil
}
-func (*VirtualMachinesClientStub) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+func (*VirtualMachinesClientStub) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
return nil, nil
}
-func (*VirtualMachinesClientStub) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
+func (*VirtualMachinesClientStub) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
return compute.VirtualMachineListResultIterator{}, nil
}
type InterfacesClientStub struct{}
-func (*InterfacesClientStub) CreateOrUpdate(ctx context.Context,
+func (*InterfacesClientStub) createOrUpdate(ctx context.Context,
resourceGroupName string,
nicName string,
parameters network.Interface) (result network.Interface, err error) {
return parameters, nil
}
-func (*InterfacesClientStub) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+func (*InterfacesClientStub) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
return nil, nil
}
-func (*InterfacesClientStub) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
+func (*InterfacesClientStub) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
return network.InterfaceListResultIterator{}, nil
}
+type testConfig struct {
+ ImageIDForTestSuite string
+ DriverParameters json.RawMessage
+}
+
var live = flag.String("live-azure-cfg", "", "Test with real azure API, provide config file")
-func GetInstanceSet() (InstanceSet, ImageID, arvados.Cluster, error) {
+func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error) {
cluster := arvados.Cluster{
InstanceTypes: arvados.InstanceTypeMap(map[string]arvados.InstanceType{
"tiny": arvados.InstanceType{
},
})}
if *live != "" {
- cfg := make(map[string]interface{})
- err := config.LoadFile(&cfg, *live)
+ var exampleCfg testConfig
+ err := config.LoadFile(&exampleCfg, *live)
if err != nil {
- return nil, ImageID(""), cluster, err
+ return nil, cloud.ImageID(""), cluster, err
}
- ap, err := NewAzureInstanceSet(cfg, "test123", logrus.StandardLogger())
- return ap, ImageID(cfg["image"].(string)), cluster, err
- } else {
- ap := AzureInstanceSet{
- azconfig: AzureInstanceSetConfig{
- BlobContainer: "vhds",
- },
- dispatcherID: "test123",
- namePrefix: "compute-test123-",
- logger: logrus.StandardLogger(),
- deleteNIC: make(chan string),
- deleteBlob: make(chan storage.Blob),
- }
- ap.ctx, ap.stopFunc = context.WithCancel(context.Background())
- ap.vmClient = &VirtualMachinesClientStub{}
- ap.netClient = &InterfacesClientStub{}
- return &ap, ImageID("blob"), cluster, nil
+
+ ap, err := newAzureInstanceSet(exampleCfg.DriverParameters, "test123", logrus.StandardLogger())
+ return ap, cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, err
}
+ ap := azureInstanceSet{
+ azconfig: azureInstanceSetConfig{
+ BlobContainer: "vhds",
+ },
+ dispatcherID: "test123",
+ namePrefix: "compute-test123-",
+ logger: logrus.StandardLogger(),
+ deleteNIC: make(chan string),
+ deleteBlob: make(chan storage.Blob),
+ }
+ ap.ctx, ap.stopFunc = context.WithCancel(context.Background())
+ ap.vmClient = &VirtualMachinesClientStub{}
+ ap.netClient = &InterfacesClientStub{}
+ return &ap, cloud.ImageID("blob"), cluster, nil
}
func (*AzureInstanceSetSuite) TestCreate(c *check.C) {
pk, _, _, _, err := ssh.ParseAuthorizedKey(testKey)
c.Assert(err, check.IsNil)
- nodetoken, err := randutil.String(40, "abcdefghijklmnopqrstuvwxyz0123456789")
- c.Assert(err, check.IsNil)
-
inst, err := ap.Create(cluster.InstanceTypes["tiny"],
img, map[string]string{
- "node-token": nodetoken},
- pk)
+ "TestTagName": "test tag value",
+ }, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
c.Assert(err, check.IsNil)
- tg := inst.Tags()
- log.Printf("Result %v %v %v", inst.String(), inst.Address(), tg)
+ tags := inst.Tags()
+ c.Check(tags["TestTagName"], check.Equals, "test tag value")
+ c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
}
c.Fatal("Error making provider", err)
}
- ap.(*AzureInstanceSet).ManageNics()
+ ap.(*azureInstanceSet).manageNics()
ap.Stop()
}
c.Fatal("Error making provider", err)
}
- ap.(*AzureInstanceSet).ManageBlobs()
+ ap.(*azureInstanceSet).manageBlobs()
ap.Stop()
}
c.Fatal("Error making provider", err)
}
- _, err = ap.(*AzureInstanceSet).netClient.Delete(context.Background(), "fakefakefake", "fakefakefake")
+ _, err = ap.(*azureInstanceSet).netClient.delete(context.Background(), "fakefakefake", "fakefakefake")
de, ok := err.(autorest.DetailedError)
if ok {
ServiceError: &azure.ServiceError{},
},
}
- wrapped := WrapAzureError(retryError)
- _, ok := wrapped.(RateLimitError)
+ wrapped := wrapAzureError(retryError)
+ _, ok := wrapped.(cloud.RateLimitError)
c.Check(ok, check.Equals, true)
quotaError := autorest.DetailedError{
},
},
}
- wrapped = WrapAzureError(quotaError)
- _, ok = wrapped.(QuotaError)
+ wrapped = wrapAzureError(quotaError)
+ _, ok = wrapped.(cloud.QuotaError)
c.Check(ok, check.Equals, true)
}
c.Assert(err, check.IsNil)
if len(l) > 0 {
-
sshclient, err := SetupSSHClient(c, l[0])
c.Assert(err, check.IsNil)
+ defer sshclient.Conn.Close()
sess, err := sshclient.NewSession()
c.Assert(err, check.IsNil)
-
- out, err := sess.Output("cat /home/crunch/node-token")
+ defer sess.Close()
+ _, err = sess.Output("find /var/run/test-file -maxdepth 0 -user root -perm 0600")
c.Assert(err, check.IsNil)
- log.Printf("%v", string(out))
-
- sshclient.Conn.Close()
+ sess, err = sshclient.NewSession()
+ c.Assert(err, check.IsNil)
+ defer sess.Close()
+ out, err := sess.Output("sudo cat /var/run/test-file")
+ c.Assert(err, check.IsNil)
+ c.Check(string(out), check.Equals, "test-file-data")
}
}
-func SetupSSHClient(c *check.C, inst Instance) (*ssh.Client, error) {
+func SetupSSHClient(c *check.C, inst cloud.Instance) (*ssh.Client, error) {
addr := inst.Address() + ":2222"
if addr == "" {
return nil, errors.New("instance has no address")
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package cloud
-
-import (
- "testing"
-
- check "gopkg.in/check.v1"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) {
- check.TestingT(t)
-}
package cloud
import (
+ "encoding/json"
+ "errors"
"io"
"time"
Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
}
+var ErrNotImplemented = errors.New("not implemented")
+
// An ExecutorTarget is a remote command execution service.
type ExecutorTarget interface {
// SSH server hostname or IP address, or empty string if
// unknown while instance is booting.
Address() string
+ // Remote username to send during SSH authentication.
+ RemoteUser() string
+
// Return nil if the given public key matches the instance's
// SSH server key. If the provided Dialer is not nil,
// VerifyHostKey can use it to make outgoing network
// connections from the instance -- e.g., to use the cloud's
// "this instance's metadata" API.
+ //
+ // Return ErrNotImplemented if no verification mechanism is
+ // available.
VerifyHostKey(ssh.PublicKey, *ssh.Client) error
}
// All public methods of an InstanceSet, and all public methods of the
// instances it returns, are goroutine safe.
type InstanceSet interface {
- // Create a new instance. If supported by the driver, add the
+ // Create a new instance with the given type, image, and
+ // initial set of tags. If supported by the driver, add the
// provided public key to /root/.ssh/authorized_keys.
//
+ // The given InitCommand should be executed on the newly
+ // created instance. This is optional for a driver whose
+ // instances' VerifyHostKey() method never returns
+ // ErrNotImplemented. InitCommand will be under 1 KiB.
+ //
// The returned error should implement RateLimitError and
// QuotaError where applicable.
- Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+ Create(arvados.InstanceType, ImageID, InstanceTags, InitCommand, ssh.PublicKey) (Instance, error)
// Return all instances, including ones that are booting or
// shutting down. Optionally, filter out nodes that don't have
Stop()
}
+type InitCommand string
+
// A Driver returns an InstanceSet that uses the given InstanceSetID
// and driver-dependent configuration parameters.
//
//
// type exampleDriver struct {}
//
-// func (*exampleDriver) InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) {
+// func (*exampleDriver) InstanceSet(config json.RawMessage, id InstanceSetID) (InstanceSet, error) {
// var is exampleInstanceSet
-// if err := mapstructure.Decode(config, &is); err != nil {
+// if err := json.Unmarshal(config, &is); err != nil {
// return nil, err
// }
// is.ownID = id
//
// var _ = registerCloudDriver("example", &exampleDriver{})
type Driver interface {
- InstanceSet(config map[string]interface{}, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error)
+ InstanceSet(config json.RawMessage, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error)
}
// DriverFunc makes a Driver using the provided function as its
// InstanceSet method. This is similar to http.HandlerFunc.
-func DriverFunc(fn func(config map[string]interface{}, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error)) Driver {
+func DriverFunc(fn func(config json.RawMessage, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error)) Driver {
return driverFunc(fn)
}
-type driverFunc func(config map[string]interface{}, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error)
+type driverFunc func(config json.RawMessage, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error)
-func (df driverFunc) InstanceSet(config map[string]interface{}, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error) {
+func (df driverFunc) InstanceSet(config json.RawMessage, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error) {
return df(config, id, logger)
}
package controller
import (
+ "context"
+
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/service"
"git.curoverse.com/arvados.git/sdk/go/arvados"
var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
-func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
return &Handler{Cluster: cluster, NodeProfile: np}
}
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/sirupsen/logrus"
var _ = check.Suite(&FederationSuite{})
type FederationSuite struct {
- log *logrus.Logger
+ log logrus.FieldLogger
// testServer and testHandler are the controller being tested,
// "zhome".
testServer *httpserver.Server
}
func (s *FederationSuite) SetUpTest(c *check.C) {
- s.log = logrus.New()
- s.log.Formatter = &logrus.JSONFormatter{}
- s.log.Out = &logWriter{c.Log}
+ s.log = ctxlog.TestLogger(c)
s.remoteServer = newServerFromIntegrationTestEnv(c)
c.Assert(s.remoteServer.Start(), check.IsNil)
func (s *FederationSuite) TestUpdateRemoteContainerRequest(c *check.C) {
defer s.localServiceReturns404(c).Close()
- req := httptest.NewRequest("PATCH", "/arvados/v1/container_requests/"+arvadostest.QueuedContainerRequestUUID,
- strings.NewReader(`{"container_request": {"priority": 696}}`))
- req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
- req.Header.Set("Content-type", "application/json")
- resp := s.testRequest(req)
- c.Check(resp.StatusCode, check.Equals, http.StatusOK)
- var cr arvados.ContainerRequest
- c.Check(json.NewDecoder(resp.Body).Decode(&cr), check.IsNil)
- c.Check(cr.UUID, check.Equals, arvadostest.QueuedContainerRequestUUID)
- c.Check(cr.Priority, check.Equals, 696)
+ setPri := func(pri int) {
+ req := httptest.NewRequest("PATCH", "/arvados/v1/container_requests/"+arvadostest.QueuedContainerRequestUUID,
+ strings.NewReader(fmt.Sprintf(`{"container_request": {"priority": %d}}`, pri)))
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ req.Header.Set("Content-type", "application/json")
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var cr arvados.ContainerRequest
+ c.Check(json.NewDecoder(resp.Body).Decode(&cr), check.IsNil)
+ c.Check(cr.UUID, check.Equals, arvadostest.QueuedContainerRequestUUID)
+ c.Check(cr.Priority, check.Equals, pri)
+ }
+ setPri(696)
+ setPri(1) // Reset fixture so side effect doesn't break other tests.
}
func (s *FederationSuite) TestCreateRemoteContainerRequest(c *check.C) {
h.handlerStack = mux
sc := *arvados.DefaultSecureClient
- sc.Timeout = time.Duration(h.Cluster.HTTPRequestTimeout)
sc.CheckRedirect = neverRedirect
h.secureClient = &sc
ic := *arvados.InsecureHTTPClient
- ic.Timeout = time.Duration(h.Cluster.HTTPRequestTimeout)
ic.CheckRedirect = neverRedirect
h.insecureClient = &ic
package controller
import (
+ "context"
"encoding/json"
"net/http"
"net/http/httptest"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
check "gopkg.in/check.v1"
)
type HandlerSuite struct {
cluster *arvados.Cluster
handler http.Handler
+ ctx context.Context
+ cancel context.CancelFunc
}
func (s *HandlerSuite) SetUpTest(c *check.C) {
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+ s.ctx = ctxlog.Context(s.ctx, ctxlog.New(os.Stderr, "json", "debug"))
s.cluster = &arvados.Cluster{
ClusterID: "zzzzz",
PostgreSQL: integrationTestCluster().PostgreSQL,
},
}
node := s.cluster.NodeProfiles["*"]
- s.handler = newHandler(s.cluster, &node)
+ s.handler = newHandler(s.ctx, s.cluster, &node)
+}
+
+func (s *HandlerSuite) TearDownTest(c *check.C) {
+ s.cancel()
}
func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) {
package controller
import (
- "bytes"
"net/http"
"os"
"path/filepath"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
- "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
-// logWriter is an io.Writer that writes by calling a "write log"
-// function, typically (*check.C)Log().
-type logWriter struct {
- logfunc func(...interface{})
-}
-
-func (tl *logWriter) Write(buf []byte) (int, error) {
- tl.logfunc(string(bytes.TrimRight(buf, "\n")))
- return len(buf), nil
-}
-
func integrationTestCluster() *arvados.Cluster {
cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
if err != nil {
// Return a new unstarted controller server, using the Rails API
// provided by the integration-testing environment.
func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
- log := logrus.New()
- log.Formatter = &logrus.JSONFormatter{}
- log.Out = &logWriter{c.Log}
+ log := ctxlog.TestLogger(c)
nodeProfile := arvados.NodeProfile{
Controller: arvados.SystemServiceInstance{Listen: ":"},
package dispatchcloud
import (
+ "context"
+
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/service"
"git.curoverse.com/arvados.git/sdk/go/arvados"
var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
-func newHandler(cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
- d := &dispatcher{Cluster: cluster}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
+ d := &dispatcher{Cluster: cluster, Context: ctx}
go d.Start()
return d
}
defer cq.mtx.Unlock()
ctr := cq.current[uuid].Container
if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
- delete(cq.current, uuid)
+ cq.delEnt(uuid, ctr.State)
}
}
cq.current[uuid] = cur
}
}
- for uuid := range cq.current {
+ for uuid, ent := range cq.current {
if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
// Don't expunge an entry that was
// added/updated locally after we started
// the poll response (evidently it's
// cancelled, completed, deleted, or taken by
// a different dispatcher).
- delete(cq.current, uuid)
+ cq.delEnt(uuid, ent.Container.State)
}
}
cq.dontupdate = nil
return nil
}
+// Caller must have lock.
+func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
+ cq.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": state,
+ }).Info("dropping container from queue")
+ delete(cq.current, uuid)
+}
+
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
it, err := cq.chooseType(&ctr)
if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
// error: it wouldn't help to try again, or to leave
// it for a different dispatcher process to attempt.
errorString := err.Error()
- cq.logger.WithField("ContainerUUID", ctr.UUID).Warn("cancel container with no suitable instance type")
+ logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
+ logger.WithError(err).Warn("cancel container with no suitable instance type")
go func() {
+ if ctr.State == arvados.ContainerStateQueued {
+ // Can't set runtime error without
+ // locking first. If Lock() is
+ // successful, it will call addEnt()
+ // again itself, and we'll fall
+ // through to the
+ // setRuntimeError/Cancel code below.
+ err := cq.Lock(ctr.UUID)
+ if err != nil {
+ logger.WithError(err).Warn("lock failed")
+ // ...and try again on the
+ // next Update, if the problem
+ // still exists.
+ }
+ return
+ }
var err error
defer func() {
if err == nil {
if latest.State == arvados.ContainerStateCancelled {
return
}
- cq.logger.WithField("ContainerUUID", ctr.UUID).WithError(err).Warn("error while trying to cancel unsatisfiable container")
+ logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
}()
- if ctr.State == arvados.ContainerStateQueued {
- err = cq.Lock(ctr.UUID)
- if err != nil {
- return
- }
- }
err = cq.setRuntimeError(ctr.UUID, errorString)
if err != nil {
return
}()
return
}
+ cq.logger.WithFields(logrus.Fields{
+ "ContainerUUID": ctr.UUID,
+ "State": ctr.State,
+ "Priority": ctr.Priority,
+ "InstanceType": it.Name,
+ }).Info("adding container to queue")
cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
}
package dispatchcloud
import (
+ "context"
"crypto/md5"
"encoding/json"
"fmt"
"git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
type dispatcher struct {
Cluster *arvados.Cluster
+ Context context.Context
InstanceSetID cloud.InstanceSetID
logger logrus.FieldLogger
}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
- disp.logger = logrus.StandardLogger()
+ disp.logger = ctxlog.FromContext(disp.Context)
- if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil {
+ if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
} else {
disp.sshKey = key
}
disp.instanceSet = instanceSet
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+ disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
if disp.Cluster.ManagementToken == "" {
mux := httprouter.New()
mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/hold", disp.apiInstanceHold)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/drain", disp.apiInstanceDrain)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/run", disp.apiInstanceRun)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
if pollInterval <= 0 {
pollInterval = defaultPollInterval
}
- sched := scheduler.New(disp.logger, disp.queue, disp.pool, staleLockTimeout, pollInterval)
+ sched := scheduler.New(disp.Context, disp.queue, disp.pool, staleLockTimeout, pollInterval)
sched.Start()
defer sched.Stop()
}
func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
- params, _ := r.Context().Value(httprouter.ParamsKey).(httprouter.Params)
- id := cloud.InstanceID(params.ByName("instance_id"))
+ id := cloud.InstanceID(r.FormValue("instance_id"))
+ if id == "" {
+ httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+ return
+ }
err := disp.pool.SetIdleBehavior(id, want)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusNotFound)
package dispatchcloud
import (
+ "context"
"encoding/json"
"io/ioutil"
"math/rand"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&DispatcherSuite{})
type DispatcherSuite struct {
+ ctx context.Context
+ cancel context.CancelFunc
cluster *arvados.Cluster
stubDriver *test.StubDriver
disp *dispatcher
}
func (s *DispatcherSuite) SetUpTest(c *check.C) {
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+ s.ctx = ctxlog.Context(s.ctx, ctxlog.TestLogger(c))
dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
c.Assert(err, check.IsNil)
TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
},
Dispatch: arvados.Dispatch{
- PrivateKey: dispatchprivraw,
+ PrivateKey: string(dispatchprivraw),
PollInterval: arvados.Duration(5 * time.Millisecond),
ProbeInterval: arvados.Duration(5 * time.Millisecond),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
},
},
}
- s.disp = &dispatcher{Cluster: s.cluster}
+ s.disp = &dispatcher{
+ Cluster: s.cluster,
+ Context: s.ctx,
+ }
// Test cases can modify s.cluster before calling
// initialize(), and then modify private state before calling
// go run().
}
func (s *DispatcherSuite) TearDownTest(c *check.C) {
+ s.cancel()
s.disp.Close()
}
c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
}
- deadline := time.Now().Add(time.Second)
+ deadline := time.Now().Add(5 * time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
c.Check(err, check.IsNil)
"fmt"
"git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/lib/cloud/azure"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
)
var drivers = map[string]cloud.Driver{
- "azure": cloud.DriverFunc(cloud.NewAzureInstanceSet),
+ "azure": azure.Driver,
}
func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
// the size of the manifest.
//
// Use the following heuristic:
- // - Start with the length of the mainfest (n)
+ // - Start with the length of the manifest (n)
// - Subtract 80 characters for the filename and file segment
// - Divide by 42 to get the number of block identifiers ('hash\+size\ ' is 32+1+8+1)
// - Assume each block is full, multiply by 64 MiB
func (sch *Scheduler) fixStaleLocks() {
wp := sch.pool.Subscribe()
defer sch.pool.Unsubscribe(wp)
+
+ var stale []string
timeout := time.NewTimer(sch.staleLockTimeout)
waiting:
- for {
- unlock := false
- select {
- case <-wp:
- // If all workers have been contacted, unlock
- // containers that aren't claimed by any
- // worker.
- unlock = sch.pool.CountWorkers()[worker.StateUnknown] == 0
- case <-timeout.C:
- // Give up and unlock the containers, even
- // though they might be working.
- unlock = true
- }
-
+ for sch.pool.CountWorkers()[worker.StateUnknown] > 0 {
running := sch.pool.Running()
qEntries, _ := sch.queue.Entries()
+
+ stale = nil
for uuid, ent := range qEntries {
if ent.Container.State != arvados.ContainerStateLocked {
continue
if _, running := running[uuid]; running {
continue
}
- if !unlock {
- continue waiting
- }
- err := sch.queue.Unlock(uuid)
- if err != nil {
- sch.logger.Warnf("Unlock %s: %s", uuid, err)
- }
+ stale = append(stale, uuid)
+ }
+ if len(stale) == 0 {
+ return
+ }
+
+ select {
+ case <-wp:
+ case <-timeout.C:
+ // Give up.
+ break waiting
+ }
+
+ }
+
+ for _, uuid := range stale {
+ err := sch.queue.Unlock(uuid)
+ if err != nil {
+ sch.logger.Warnf("Unlock %s: %s", uuid, err)
}
- return
}
}
import (
"sort"
+ "time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
"git.curoverse.com/arvados.git/sdk/go/arvados"
overquota = sorted[i:]
break tryrun
}
- sch.bgLock(logger, ctr.UUID)
+ go sch.lockContainer(logger, ctr.UUID)
unalloc[it]--
case arvados.ContainerStateLocked:
if unalloc[it] > 0 {
}
}
-// Start an API call to lock the given container, and return
-// immediately while waiting for the response in a new goroutine. Do
-// nothing if a lock request is already in progress for this
-// container.
-func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
- logger.Debug("locking")
- sch.mtx.Lock()
- defer sch.mtx.Unlock()
- if sch.locking[uuid] {
- logger.Debug("locking in progress, doing nothing")
+// Lock the given container. Should be called in a new goroutine.
+func (sch *Scheduler) lockContainer(logger logrus.FieldLogger, uuid string) {
+ if !sch.uuidLock(uuid, "lock") {
return
}
+ defer sch.uuidUnlock(uuid)
if ctr, ok := sch.queue.Get(uuid); !ok || ctr.State != arvados.ContainerStateQueued {
// This happens if the container has been cancelled or
// locked since runQueue called sch.queue.Entries(),
- // possibly by a bgLock() call from a previous
+ // possibly by a lockContainer() call from a previous
// runQueue iteration. In any case, we will respond
// appropriately on the next runQueue iteration, which
// will have already been triggered by the queue
logger.WithField("State", ctr.State).Debug("container no longer queued by the time we decided to lock it, doing nothing")
return
}
- sch.locking[uuid] = true
- go func() {
- defer func() {
- sch.mtx.Lock()
- defer sch.mtx.Unlock()
- delete(sch.locking, uuid)
- }()
- err := sch.queue.Lock(uuid)
- if err != nil {
- logger.WithError(err).Warn("error locking container")
- return
- }
- logger.Debug("lock succeeded")
- ctr, ok := sch.queue.Get(uuid)
- if !ok {
- logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
- } else if ctr.State != arvados.ContainerStateLocked {
- logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
- }
- }()
+ err := sch.queue.Lock(uuid)
+ if err != nil {
+ logger.WithError(err).Warn("error locking container")
+ return
+ }
+ logger.Debug("lock succeeded")
+ ctr, ok := sch.queue.Get(uuid)
+ if !ok {
+ logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
+ } else if ctr.State != arvados.ContainerStateLocked {
+ logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
+ }
+}
+
+// Acquire a non-blocking lock for specified UUID, returning true if
+// successful. The op argument is used only for debug logs.
+//
+// If the lock is not available, uuidLock arranges to wake up the
+// scheduler after a short delay, so it can retry whatever operation
+// is trying to get the lock (if that operation is still worth doing).
+//
+// This mechanism helps avoid spamming the controller/database with
+// concurrent updates for any single container, even when the
+// scheduler loop is running frequently.
+func (sch *Scheduler) uuidLock(uuid, op string) bool {
+ sch.mtx.Lock()
+ defer sch.mtx.Unlock()
+ logger := sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "Op": op,
+ })
+ if op, locked := sch.uuidOp[uuid]; locked {
+ logger.Debugf("uuidLock not available, Op=%s in progress", op)
+ // Make sure the scheduler loop wakes up to retry.
+ sch.wakeup.Reset(time.Second / 4)
+ return false
+ }
+ logger.Debug("uuidLock acquired")
+ sch.uuidOp[uuid] = op
+ return true
+}
+
+func (sch *Scheduler) uuidUnlock(uuid string) {
+ sch.mtx.Lock()
+ defer sch.mtx.Unlock()
+ delete(sch.uuidOp, uuid)
}
package scheduler
import (
+ "context"
"sync"
"time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
)
// immediately. Don't try to create any other nodes after the failed
// create.
func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
queue := test.Queue{
ChooseType: chooseType,
Containers: []arvados.Container{
running: map[string]time.Time{},
canCreate: 0,
}
- New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.running, check.HasLen, 1)
// If Create() fails, shutdown some nodes, and don't call Create()
// again. Don't call Create() at all if AtQuota() is true.
func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
for quota := 0; quota < 2; quota++ {
c.Logf("quota=%d", quota)
shouldCreate := []arvados.InstanceType{}
starts: []string{},
canCreate: 0,
}
- New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, shouldCreate)
c.Check(pool.starts, check.DeepEquals, []string{})
c.Check(pool.shutdowns, check.Not(check.Equals), 0)
// Start lower-priority containers while waiting for new/existing
// workers to come up for higher-priority containers.
func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
pool := stubPool{
unalloc: map[arvados.InstanceType]int{
test.InstanceType(1): 2,
},
}
queue.Update()
- New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
running := map[string]bool{}
package scheduler
import (
+ "context"
"sync"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/sirupsen/logrus"
)
staleLockTimeout time.Duration
queueUpdateInterval time.Duration
- locking map[string]bool
- mtx sync.Mutex
+ uuidOp map[string]string // operation in progress: "lock", "cancel", ...
+ mtx sync.Mutex
+ wakeup *time.Timer
runOnce sync.Once
stop chan struct{}
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
return &Scheduler{
- logger: logger,
+ logger: ctxlog.FromContext(ctx),
queue: queue,
pool: pool,
staleLockTimeout: staleLockTimeout,
queueUpdateInterval: queueUpdateInterval,
+ wakeup: time.NewTimer(time.Second),
stop: make(chan struct{}),
stopped: make(chan struct{}),
- locking: map[string]bool{},
+ uuidOp: map[string]string{},
}
}
// Ensure the queue is fetched once before attempting anything.
for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
sch.logger.Errorf("error updating queue: %s", err)
- d := sch.queueUpdateInterval / 60
+ d := sch.queueUpdateInterval / 10
+ if d < time.Second {
+ d = time.Second
+ }
sch.logger.Infof("waiting %s before retry", d)
time.Sleep(d)
}
return
case <-queueNotify:
case <-poolNotify:
+ case <-sch.wakeup.C:
}
}
}
import (
"fmt"
- "time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
"git.curoverse.com/arvados.git/sdk/go/arvados"
// cancelled.
func (sch *Scheduler) sync() {
running := sch.pool.Running()
- cancel := func(ent container.QueueEnt, reason string) {
- uuid := ent.Container.UUID
- logger := sch.logger.WithField("ContainerUUID", uuid)
- logger.Infof("cancelling container because %s", reason)
- err := sch.queue.Cancel(uuid)
- if err != nil {
- logger.WithError(err).Print("error cancelling container")
- }
- }
- kill := func(ent container.QueueEnt, reason string) {
- uuid := ent.Container.UUID
- logger := sch.logger.WithField("ContainerUUID", uuid)
- logger.Debugf("killing crunch-run process because %s", reason)
- sch.pool.KillContainer(uuid)
- }
qEntries, qUpdated := sch.queue.Entries()
for uuid, ent := range qEntries {
exited, running := running[uuid]
switch ent.Container.State {
case arvados.ContainerStateRunning:
if !running {
- go cancel(ent, "not running on any worker")
+ go sch.cancel(ent, "not running on any worker")
} else if !exited.IsZero() && qUpdated.After(exited) {
- go cancel(ent, "state=\"Running\" after crunch-run exited")
+ go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
} else if ent.Container.Priority == 0 {
- go kill(ent, fmt.Sprintf("priority=%d", ent.Container.Priority))
+ go sch.kill(ent, "priority=0")
}
case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
if running {
// of kill() will be to make the
// worker available for the next
// container.
- go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
} else {
sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
// a network outage and is still
// preparing to run a container that
// has already been unlocked/requeued.
- go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
}
case arvados.ContainerStateLocked:
if running && !exited.IsZero() && qUpdated.After(exited) {
- logger := sch.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "Exited": time.Since(exited).Seconds(),
- })
- logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
- err := sch.queue.Unlock(uuid)
- if err != nil {
- logger.WithError(err).Info("error requeueing container")
- }
+ go sch.requeue(ent, "crunch-run exited")
+ } else if running && exited.IsZero() && ent.Container.Priority == 0 {
+ go sch.kill(ent, "priority=0")
+ } else if !running && ent.Container.Priority == 0 {
+ go sch.requeue(ent, "priority=0")
}
default:
- sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+ sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": ent.Container.State,
+ }).Error("BUG: unexpected state")
}
}
}
+
+func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
+ uuid := ent.Container.UUID
+ if !sch.uuidLock(uuid, "cancel") {
+ return
+ }
+ defer sch.uuidUnlock(uuid)
+ logger := sch.logger.WithField("ContainerUUID", uuid)
+ logger.Infof("cancelling container because %s", reason)
+ err := sch.queue.Cancel(uuid)
+ if err != nil {
+ logger.WithError(err).Print("error cancelling container")
+ }
+}
+
+func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
+ uuid := ent.Container.UUID
+ logger := sch.logger.WithField("ContainerUUID", uuid)
+ logger.Debugf("killing crunch-run process because %s", reason)
+ sch.pool.KillContainer(uuid)
+}
+
+func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
+ uuid := ent.Container.UUID
+ if !sch.uuidLock(uuid, "cancel") {
+ return
+ }
+ defer sch.uuidUnlock(uuid)
+ logger := sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": ent.Container.State,
+ "Priority": ent.Container.Priority,
+ })
+ logger.Infof("requeueing locked container because %s", reason)
+ err := sch.queue.Unlock(uuid)
+ if err != nil {
+ logger.WithError(err).Error("error requeueing container")
+ }
+}
type Executor struct {
target cloud.ExecutorTarget
targetPort string
+ targetUser string
signers []ssh.Signer
mtx sync.RWMutex // controls access to instance after creation
if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
// Target address does not specify a port. Use
// targetPort, or "ssh".
+ if h == "" {
+ h = addr
+ }
if p = exr.targetPort; p == "" {
p = "ssh"
}
}
var receivedKey ssh.PublicKey
client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
- User: "root",
+ User: target.RemoteUser(),
Auth: []ssh.AuthMethod{
ssh.PublicKeys(exr.signers...),
},
return 0
},
HostKey: hostpriv,
+ AuthorizedUser: "username",
AuthorizedKeys: []ssh.PublicKey{clientpub},
},
}
return uint32(exitcode)
},
HostKey: hostpriv,
+ AuthorizedUser: "username",
AuthorizedKeys: []ssh.PublicKey{clientpub},
},
}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package test
-
-import (
- "os"
-
- "github.com/sirupsen/logrus"
-)
-
-func Logger() logrus.FieldLogger {
- logger := logrus.StandardLogger()
- if os.Getenv("ARVADOS_DEBUG") != "" {
- logger.SetLevel(logrus.DebugLevel)
- }
- return logger
-}
type SSHService struct {
Exec SSHExecFunc
HostKey ssh.Signer
+ AuthorizedUser string
AuthorizedKeys []ssh.PublicKey
listener net.Listener
return ln.Addr().String()
}
+// RemoteUser returns the username that will be accepted.
+func (ss *SSHService) RemoteUser() string {
+ return ss.AuthorizedUser
+}
+
// Close shuts down the server and releases resources. Established
// connections are unaffected.
func (ss *SSHService) Close() {
}
config.AddHostKey(ss.HostKey)
- listener, err := net.Listen("tcp", ":")
+ listener, err := net.Listen("tcp", "127.0.0.1:")
if err != nil {
ss.err = err
return
import (
"crypto/rand"
+ "encoding/json"
"errors"
"fmt"
"io"
+ "io/ioutil"
math_rand "math/rand"
"regexp"
"strings"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/mitchellh/mapstructure"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
}
// InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
if sd.holdCloudOps == nil {
sd.holdCloudOps = make(chan bool)
}
sis := StubInstanceSet{
driver: sd,
+ logger: logger,
servers: map[cloud.InstanceID]*StubVM{},
}
sd.instanceSets = append(sd.instanceSets, &sis)
- return &sis, mapstructure.Decode(params, &sis)
+
+ var err error
+ if params != nil {
+ err = json.Unmarshal(params, &sis)
+ }
+ return &sis, err
}
// InstanceSets returns all instances that have been created by the
type StubInstanceSet struct {
driver *StubDriver
+ logger logrus.FieldLogger
servers map[cloud.InstanceID]*StubVM
mtx sync.RWMutex
stopped bool
allowInstancesCall time.Time
}
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
if sis.driver.HoldCloudOps {
sis.driver.holdCloudOps <- true
}
id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
tags: copyTags(tags),
providerType: it.ProviderType,
+ initCommand: cmd,
}
svm.SSHService = SSHService{
HostKey: sis.driver.HostKey,
+ AuthorizedUser: "root",
AuthorizedKeys: ak,
Exec: svm.Exec,
}
sis *StubInstanceSet
id cloud.InstanceID
tags cloud.InstanceTags
+ initCommand cloud.InitCommand
providerType string
SSHService SSHService
running map[string]bool
}
func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ stdinData, err := ioutil.ReadAll(stdin)
+ if err != nil {
+ fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
+ return 1
+ }
queue := svm.sis.driver.Queue
uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
if eta := svm.Boot.Sub(time.Now()); eta > 0 {
fmt.Fprint(stderr, "crunch-run: command not found\n")
return 1
}
- if strings.HasPrefix(command, "crunch-run --detach ") {
+ if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
+ var stdinKV map[string]string
+ err := json.Unmarshal(stdinData, &stdinKV)
+ if err != nil {
+ fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
+ return 1
+ }
for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
- if env[name] == "" {
- fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
+ if stdinKV[name] == "" {
+ fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
return 1
}
}
svm.Unlock()
time.Sleep(svm.CrunchRunDetachDelay)
fmt.Fprintf(stderr, "starting %s\n", uuid)
- logger := logrus.WithFields(logrus.Fields{
+ logger := svm.sis.logger.WithFields(logrus.Fields{
"Instance": svm.id,
"ContainerUUID": uuid,
})
return si.addr
}
+func (si stubInstance) RemoteUser() string {
+ return si.svm.SSHService.AuthorizedUser
+}
+
func (si stubInstance) Destroy() error {
sis := si.svm.sis
if sis.driver.HoldCloudOps {
package worker
import (
+ "crypto/rand"
"errors"
+ "fmt"
"io"
"sort"
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
const (
- tagKeyInstanceType = "InstanceType"
- tagKeyIdleBehavior = "IdleBehavior"
+ tagKeyInstanceType = "InstanceType"
+ tagKeyIdleBehavior = "IdleBehavior"
+ tagKeyInstanceSecret = "InstanceSecret"
)
// An InstanceView shows a worker's current state and recent activity.
type InstanceView struct {
Instance cloud.InstanceID `json:"instance"`
+ Address string `json:"address"`
Price float64 `json:"price"`
ArvadosInstanceType string `json:"arvados_instance_type"`
ProviderInstanceType string `json:"provider_instance_type"`
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
arvClient: arvClient,
timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ installPublicKey: installPublicKey,
stop: make(chan bool),
}
wp.registerMetrics(reg)
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutShutdown time.Duration
+ installPublicKey ssh.PublicKey
// private state
subscribers map[<-chan struct{}]chan<- struct{}
throttleCreate throttle
throttleInstances throttle
- mInstances prometheus.Gauge
- mInstancesPrice prometheus.Gauge
mContainersRunning prometheus.Gauge
- mVCPUs prometheus.Gauge
- mVCPUsInuse prometheus.Gauge
- mMemory prometheus.Gauge
- mMemoryInuse prometheus.Gauge
+ mInstances *prometheus.GaugeVec
+ mInstancesPrice *prometheus.GaugeVec
+ mVCPUs *prometheus.GaugeVec
+ mMemory *prometheus.GaugeVec
}
// Subscribe returns a buffered channel that becomes ready after any
if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
return false
}
- tags := cloud.InstanceTags{
- tagKeyInstanceType: it.Name,
- tagKeyIdleBehavior: string(IdleBehaviorRun),
- }
now := time.Now()
wp.creating[it] = append(wp.creating[it], now)
go func() {
defer wp.notify()
- inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+ secret := randomHex(instanceSecretLength)
+ tags := cloud.InstanceTags{
+ tagKeyInstanceType: it.Name,
+ tagKeyIdleBehavior: string(IdleBehaviorRun),
+ tagKeyInstanceSecret: secret,
+ }
+ initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+ inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
// Remove our timestamp marker from wp.creating
//
// Caller must have lock.
func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+ inst = tagVerifier{inst}
id := inst.ID()
if wkr := wp.workers[id]; wkr != nil {
wkr.executor.SetTarget(inst)
logger := wp.logger.WithFields(logrus.Fields{
"InstanceType": it.Name,
- "Instance": inst,
+ "Instance": inst.ID(),
+ "Address": inst.Address(),
})
logger.WithFields(logrus.Fields{
"State": initialState,
func (wp *Pool) kill(wkr *worker, uuid string) {
logger := wp.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
- "Instance": wkr.instance,
+ "Instance": wkr.instance.ID(),
})
logger.Debug("killing process")
- stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
+ cmd := "crunch-run --kill 15 " + uuid
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
logger.WithFields(logrus.Fields{
"stderr": string(stderr),
if reg == nil {
reg = prometheus.NewRegistry()
}
- wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "instances_total",
- Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
- })
- reg.MustRegister(wp.mInstances)
- wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "instances_price_total",
- Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
- })
- reg.MustRegister(wp.mInstancesPrice)
wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Help: "Number of containers reported running by cloud VMs.",
})
reg.MustRegister(wp.mContainersRunning)
-
- wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_total",
+ Help: "Number of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstances)
+ wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_price",
+ Help: "Price of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstancesPrice)
+ wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "vcpus_total",
Help: "Total VCPUs on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mVCPUs)
- wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "vcpus_inuse",
- Help: "VCPUs on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mVCPUsInuse)
- wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "memory_bytes_total",
Help: "Total memory on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mMemory)
- wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "memory_bytes_inuse",
- Help: "Memory on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mMemoryInuse)
}
func (wp *Pool) runMetrics() {
wp.mtx.RLock()
defer wp.mtx.RUnlock()
- var price float64
- var alloc, cpu, cpuInuse, mem, memInuse int64
+ instances := map[string]int64{}
+ price := map[string]float64{}
+ cpu := map[string]int64{}
+ mem := map[string]int64{}
+ var running int64
for _, wkr := range wp.workers {
- price += wkr.instType.Price
- cpu += int64(wkr.instType.VCPUs)
- mem += int64(wkr.instType.RAM)
- if len(wkr.running)+len(wkr.starting) == 0 {
- continue
+ var cat string
+ switch {
+ case len(wkr.running)+len(wkr.starting) > 0:
+ cat = "inuse"
+ case wkr.idleBehavior == IdleBehaviorHold:
+ cat = "hold"
+ case wkr.state == StateBooting:
+ cat = "booting"
+ case wkr.state == StateUnknown:
+ cat = "unknown"
+ default:
+ cat = "idle"
}
- alloc += int64(len(wkr.running) + len(wkr.starting))
- cpuInuse += int64(wkr.instType.VCPUs)
- memInuse += int64(wkr.instType.RAM)
- }
- wp.mInstances.Set(float64(len(wp.workers)))
- wp.mInstancesPrice.Set(price)
- wp.mContainersRunning.Set(float64(alloc))
- wp.mVCPUs.Set(float64(cpu))
- wp.mMemory.Set(float64(mem))
- wp.mVCPUsInuse.Set(float64(cpuInuse))
- wp.mMemoryInuse.Set(float64(memInuse))
+ instances[cat]++
+ price[cat] += wkr.instType.Price
+ cpu[cat] += int64(wkr.instType.VCPUs)
+ mem[cat] += int64(wkr.instType.RAM)
+ running += int64(len(wkr.running) + len(wkr.starting))
+ }
+ for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+ wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
+ wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+ wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+ wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+ }
+ wp.mContainersRunning.Set(float64(running))
}
func (wp *Pool) runProbes() {
for _, w := range wp.workers {
r = append(r, InstanceView{
Instance: w.instance.ID(),
+ Address: w.instance.Address(),
Price: w.instType.Price,
ArvadosInstanceType: w.instType.Name,
ProviderInstanceType: w.instType.ProviderType,
continue
}
logger := wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
+ "Instance": wkr.instance.ID(),
"WorkerState": wkr.state,
})
logger.Info("instance disappeared in cloud")
go wp.notify()
}
}
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+ buf := make([]byte, n/2)
+ _, err := rand.Read(buf)
+ if err != nil {
+ panic(err)
+ }
+ return fmt.Sprintf("%x", buf)
+}
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
}
}
- logger := test.Logger()
+ logger := ctxlog.TestLogger(c)
driver := &test.StubDriver{}
is, err := driver.InstanceSet(nil, "", logger)
c.Assert(err, check.IsNil)
},
}
- pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+ pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
notify := pool.Subscribe()
defer pool.Unsubscribe(notify)
pool.Create(type1)
c.Log("------- starting new pool, waiting to recover state")
- pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+ pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
notify2 := pool2.Subscribe()
defer pool2.Unsubscribe(notify2)
waitForIdle(pool2, notify2)
}
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
- logger := test.Logger()
+ logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
instanceSet, err := driver.InstanceSet(nil, "", logger)
c.Assert(err, check.IsNil)
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "golang.org/x/crypto/ssh"
+)
+
+var (
+ errBadInstanceSecret = errors.New("bad instance secret")
+
+ // filename on instance, as given to shell (quoted accordingly)
+ instanceSecretFilename = "/var/run/arvados-instance-secret"
+ instanceSecretLength = 40 // hex digits
+)
+
+type tagVerifier struct {
+ cloud.Instance
+}
+
+func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
+ expectSecret := tv.Instance.Tags()[tagKeyInstanceSecret]
+ if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || expectSecret == "" {
+ // If the wrapped instance indicates it has a way to
+ // verify the key, return that decision.
+ return err
+ }
+ session, err := client.NewSession()
+ if err != nil {
+ return err
+ }
+ defer session.Close()
+ var stdout, stderr bytes.Buffer
+ session.Stdin = bytes.NewBuffer(nil)
+ session.Stdout = &stdout
+ session.Stderr = &stderr
+ cmd := fmt.Sprintf("cat %s", instanceSecretFilename)
+ if u := tv.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ err = session.Run(cmd)
+ if err != nil {
+ return err
+ }
+ if stdout.String() != expectSecret {
+ return errBadInstanceSecret
+ }
+ return nil
+}
import (
"bytes"
+ "encoding/json"
"fmt"
"strings"
"sync"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
"github.com/sirupsen/logrus"
)
"ContainerUUID": ctr.UUID,
"Priority": ctr.Priority,
})
- logger = logger.WithField("Instance", wkr.instance)
+ logger = logger.WithField("Instance", wkr.instance.ID())
logger.Debug("starting container")
wkr.starting[ctr.UUID] = struct{}{}
wkr.state = StateRunning
"ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
"ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
}
- stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
+ if wkr.wp.arvClient.Insecure {
+ env["ARVADOS_API_HOST_INSECURE"] = "1"
+ }
+ envJSON, err := json.Marshal(env)
+ if err != nil {
+ panic(err)
+ }
+ stdin := bytes.NewBuffer(envJSON)
+ cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
func (wkr *worker) probeRunning() (running []string, ok bool) {
cmd := "crunch-run --list"
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
wkr.logger.WithFields(logrus.Fields{
"State": wkr.state,
- "Age": age,
+ "IdleDuration": stats.Duration(age),
"IdleBehavior": wkr.idleBehavior,
}).Info("shutdown idle worker")
wkr.shutdown()
// match. Caller must have lock.
func (wkr *worker) saveTags() {
instance := wkr.instance
- have := instance.Tags()
- want := cloud.InstanceTags{
+ tags := instance.Tags()
+ update := cloud.InstanceTags{
tagKeyInstanceType: wkr.instType.Name,
tagKeyIdleBehavior: string(wkr.idleBehavior),
}
- go func() {
- for k, v := range want {
- if v == have[k] {
- continue
- }
- err := instance.SetTags(want)
+ save := false
+ for k, v := range update {
+ if tags[k] != v {
+ tags[k] = v
+ save = true
+ }
+ }
+ if save {
+ go func() {
+ err := instance.SetTags(tags)
if err != nil {
- wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+ wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
}
- break
-
- }
- }()
+ }()
+ }
}
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
)
type WorkerSuite struct{}
func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
- logger := test.Logger()
+ logger := ctxlog.TestLogger(c)
bootTimeout := time.Minute
probeTimeout := time.Second
is, err := (&test.StubDriver{}).InstanceSet(nil, "", logger)
c.Assert(err, check.IsNil)
- inst, err := is.Create(arvados.InstanceType{}, "", nil, nil)
+ inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
c.Assert(err, check.IsNil)
type trialT struct {
package service
import (
+ "context"
"flag"
"fmt"
"io"
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"github.com/coreos/go-systemd/daemon"
"github.com/sirupsen/logrus"
CheckHealth() error
}
-type NewHandlerFunc func(*arvados.Cluster, *arvados.NodeProfile) Handler
+type NewHandlerFunc func(context.Context, *arvados.Cluster, *arvados.NodeProfile) Handler
type command struct {
newHandler NewHandlerFunc
}
func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
- log := logrus.New()
- log.Formatter = &logrus.JSONFormatter{
- TimestampFormat: rfc3339NanoFixed,
- }
- log.Out = stderr
+ log := ctxlog.New(stderr, "json", "info")
var err error
defer func() {
if err != nil {
return 1
}
+ log = ctxlog.New(stderr, cluster.Logging.Format, cluster.Logging.Level).WithFields(logrus.Fields{
+ "PID": os.Getpid(),
+ })
+ ctx := ctxlog.Context(context.Background(), log)
profileName := *nodeProfile
if profileName == "" {
profileName = os.Getenv("ARVADOS_NODE_PROFILE")
err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
return 1
}
- handler := c.newHandler(cluster, profile)
+ handler := c.newHandler(ctx, cluster, profile)
if err = handler.CheckHealth(); err != nil {
return 1
}
s.executables << "arv-crunch-job"
s.executables << "arv-tag"
s.required_ruby_version = '>= 2.1.0'
- s.add_runtime_dependency 'arvados', '~> 1.2.0', '>= 1.2.0'
+ s.add_runtime_dependency 'arvados', '~> 1.3.0', '>= 1.3.0'
# Our google-api-client dependency used to be < 0.9, but that could be
# satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
s.add_runtime_dependency 'cure-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
- except Exception as e:
- logger.error(e)
+ except Exception:
+ logger.exception("Error creating the Arvados CWL Executor")
return 1
+ # Note that unless in debug mode, some stack traces related to user
+ # workflow errors may be suppressed. See ArvadosJob.done().
if arvargs.debug:
logger.setLevel(logging.DEBUG)
logging.getLogger('arvados').setLevel(logging.DEBUG)
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
else:
logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
- except Exception as e:
- logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
+ except Exception:
+ logger.exception("%s got an error", self.arvrunner.label(self))
self.output_callback({}, "permanentFail")
def done(self, record):
if container["output"]:
outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
except WorkflowException as e:
+ # Only include a stack trace if in debug mode.
+ # A stack trace may obfuscate more useful output about the workflow.
logger.error("%s unable to collect output from %s:\n%s",
self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
- except Exception as e:
- logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
+ except Exception:
+ logger.exception("%s while getting output object:", self.arvrunner.label(self))
processStatus = "permanentFail"
finally:
self.output_callback(outputs, processStatus)
extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
if runtimeContext.submit_request_uuid:
+ if "cluster_id" in extra_submit_params:
+ # Doesn't make sense for "update" and actually fails
+ del extra_submit_params["cluster_id"]
response = self.arvrunner.api.container_requests().update(
uuid=runtimeContext.submit_request_uuid,
body=job_spec,
container = self.arvrunner.api.containers().get(
uuid=record["container_uuid"]
).execute(num_retries=self.arvrunner.num_retries)
- except Exception as e:
- logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
+ except Exception:
+ logger.exception("%s while getting runner container", self.arvrunner.label(self))
self.arvrunner.output_callback({}, "permanentFail")
else:
super(RunnerContainer, self).done(container)
arvados.commands.put.api_client = api_client
arvados.commands.keepdocker.main(args, stdout=sys.stderr, install_sig_handlers=False, api=api_client)
except SystemExit as e:
+ # If e.code is None or zero, then keepdocker exited normally and we can continue
if e.code:
raise WorkflowException("keepdocker exited with code %s" % e.code)
e)
else:
logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
- except Exception as e:
+ except Exception:
logger.exception("%s error" % (self.arvrunner.label(self)))
self.output_callback({}, "permanentFail")
body={
"components": components
}).execute(num_retries=self.arvrunner.num_retries)
- except Exception as e:
- logger.info("Error adding to components: %s", e)
+ except Exception:
+ logger.exception("Error adding to components")
def done(self, record):
try:
outputs = done.done(self, record, dirs["tmpdir"],
dirs["outdir"], dirs["keep"])
except WorkflowException as e:
+ # Only include a stack trace if in debug mode.
+ # This is most likely a user workflow error and a stack trace may obfuscate more useful output.
logger.error("%s unable to collect output from %s:\n%s",
self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
- except Exception as e:
+ except Exception:
logger.exception("Got unknown exception while collecting output for job %s:", self.name)
processStatus = "permanentFail"
keys = keys[pageSize:]
try:
proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.warning("Error checking states on API server: %s", e)
+ except Exception:
+ logger.exception("Error checking states on API server: %s")
remain_wait = self.poll_interval
continue
for i in self.intermediate_output_collections:
try:
self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
- except:
+ except Exception:
logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
- if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+ except (KeyboardInterrupt, SystemExit):
break
def check_features(self, obj):
body={
'is_trashed': True
}).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.info("Setting container output: %s", e)
+ except Exception:
+ logger.exception("Setting container output")
+ return
elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
body={
except:
if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
logger.error("Interrupted, workflow will be cancelled")
+ elif isinstance(sys.exc_info()[1], WorkflowException):
+ logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
else:
- logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ logger.exception("Workflow execution failed")
+
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
return True
except arvados.errors.NotFoundError:
return False
- except:
- logger.exception("Got unexpected exception checking if file exists:")
+ except Exception:
+ logger.exception("Got unexpected exception checking if file exists")
return False
return super(CollectionFetcher, self).check_exists(url)
count = 0
start = time.time()
checkpoint = start
- with c.open(name, "w") as f:
+ with c.open(name, "wb") as f:
for chunk in req.iter_content(chunk_size=1024):
count += len(chunk)
f.write(chunk)
from future.utils import viewvalues, viewitems
import os
+import sys
import urllib.parse
from functools import partial
import logging
import json
-import subprocess32 as subprocess
from collections import namedtuple
-
from io import StringIO
+if os.name == "posix" and sys.version_info[0] < 3:
+ import subprocess32 as subprocess
+else:
+ import subprocess
+
from schema_salad.sourceline import SourceLine, cmap
from cwltool.command_line_tool import CommandLineTool
fileobj["location"] = "keep:%s/%s" % (record["output"], path)
adjustFileObjs(outputs, keepify)
adjustDirObjs(outputs, keepify)
- except Exception as e:
- logger.exception("[%s] While getting final output object: %s", self.name, e)
+ except Exception:
+ logger.exception("[%s] While getting final output object", self.name)
self.arvrunner.output_callback({}, "permanentFail")
else:
self.arvrunner.output_callback(outputs, processStatus)
'schema-salad==3.0.20181129082112',
'typing >= 3.6.4',
'ruamel.yaml >=0.15.54, <=0.15.77',
- 'arvados-python-client>=1.2.1.20181130020805',
+ 'arvados-python-client>=1.3.0.20190205182514',
'setuptools',
'ciso8601 >=1.0.6, <2.0.0',
- 'subprocess32>=3.5.1',
],
+ extras_require={
+ ':os.name=="posix" and python_version<"3"': ['subprocess32 >= 3.5.1'],
+ },
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
],
getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
- cm.open.assert_called_with("file1.txt", "w")
+ cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
owner_uuid=None, ensure_unique_name=True)
getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
- cm.open.assert_called_with("file1.txt", "w")
+ cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
owner_uuid=None, ensure_unique_name=True)
getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True)
- cm.open.assert_called_with("file1.txt", "w")
+ cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http://example.com/download?fn=/file1.txt",
owner_uuid=None, ensure_unique_name=True)
body=JsonDiffMatcher(expect_pipeline))
@stubs
- def test_submit_container(self, stubs):
+ def test_submit_container(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_container_request_uuid + '\n')
+ stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
@stubs
["--submit", "--no-wait", "--api=containers", "--debug", "--on-error=stop",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
-
+
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
@stubs
def test_submit_container_output_name(self, stubs):
output_name = "test_output_name"
-
+
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--output-name", output_name,
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
self.assertEqual(exited, 0)
@stubs
- def test_submit_storage_classes(self, stubs):
+ def test_submit_storage_classes(self, stubs):
exited = arvados_cwl.main(
["--debug", "--submit", "--no-wait", "--api=containers", "--storage-classes=foo",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
["--submit", "--no-wait", "--api=containers", "--debug", "--trash-intermediate",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
-
+
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
stubs.api.container_requests().update.assert_called_with(
- uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec), cluster_id="zzzzz")
+ uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec))
self.assertEqual(stubs.capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
self.assertEqual(stubs.capture_stdout.getvalue(),
self.existing_template_uuid + '\n')
self.assertEqual(exited, 0)
-
+
class TestCreateWorkflow(unittest.TestCase):
existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml"
RemoteClusters map[string]RemoteCluster
PostgreSQL PostgreSQL
RequestLimits RequestLimits
+ Logging Logging
+}
+
+type Logging struct {
+ Level string
+ Format string
}
type PostgreSQL struct {
type Dispatch struct {
// PEM encoded SSH key (RSA, DSA, or ECDSA) able to log in to
// cloud VMs.
- PrivateKey []byte
+ PrivateKey string
// Max time for workers to come up before abandoning stale
// locks from previous run
ImageID string
Driver string
- DriverParameters map[string]interface{}
+ DriverParameters json.RawMessage
}
type InstanceTypeMap map[string]InstanceType
if _, ok := (*it)[t.Name]; ok {
return errDuplicateInstanceTypeName
}
+ if t.ProviderType == "" {
+ t.ProviderType = t.Name
+ }
(*it)[t.Name] = t
}
return nil
if err != nil {
return err
}
- // Fill in Name field using hash key.
+ // Fill in Name field (and ProviderType field, if not
+ // specified) using hash key.
*it = InstanceTypeMap(hash)
for name, t := range *it {
t.Name = name
+ if t.ProviderType == "" {
+ t.ProviderType = name
+ }
(*it)[name] = t
}
return nil
// a number of nanoseconds.
type Duration time.Duration
-// UnmarshalJSON implements json.Unmarshaler
+// UnmarshalJSON implements json.Unmarshaler.
func (d *Duration) UnmarshalJSON(data []byte) error {
if data[0] == '"' {
return d.Set(string(data[1 : len(data)-1]))
return fmt.Errorf("duration must be given as a string like \"600s\" or \"1h30m\"")
}
-// MarshalJSON implements json.Marshaler
+// MarshalJSON implements json.Marshaler.
func (d *Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(d.String())
}
-// String implements fmt.Stringer
+// String implements fmt.Stringer.
func (d Duration) String() string {
return time.Duration(d).String()
}
-// Duration returns a time.Duration
+// Duration returns a time.Duration.
func (d Duration) Duration() time.Duration {
return time.Duration(d)
}
-// Value implements flag.Value
+// Set implements the flag.Value interface and sets the duration value by using time.ParseDuration to parse the string.
func (d *Duration) Set(s string) error {
dur, err := time.ParseDuration(s)
*d = Duration(dur)
package ctxlog
import (
+ "bytes"
"context"
+ "io"
+ "os"
"github.com/sirupsen/logrus"
+ check "gopkg.in/check.v1"
)
var (
// Context returns a new child context such that FromContext(child)
// returns the given logger.
-func Context(ctx context.Context, logger *logrus.Entry) context.Context {
+func Context(ctx context.Context, logger logrus.FieldLogger) context.Context {
return context.WithValue(ctx, loggerCtxKey, logger)
}
// FromContext returns the logger suitable for the given context -- the one
// attached by contextWithLogger() if applicable, otherwise the
// top-level logger with no fields/values.
-func FromContext(ctx context.Context) *logrus.Entry {
+func FromContext(ctx context.Context) logrus.FieldLogger {
if ctx != nil {
- if logger, ok := ctx.Value(loggerCtxKey).(*logrus.Entry); ok {
+ if logger, ok := ctx.Value(loggerCtxKey).(logrus.FieldLogger); ok {
return logger
}
}
return rootLogger.WithFields(nil)
}
+// New returns a new logger with the indicated format and
+// level.
+func New(out io.Writer, format, level string) logrus.FieldLogger {
+ logger := logrus.New()
+ logger.Out = out
+ setFormat(logger, format)
+ setLevel(logger, level)
+ return logger
+}
+
+func TestLogger(c *check.C) logrus.FieldLogger {
+ logger := logrus.New()
+ logger.Out = &logWriter{c.Log}
+ setFormat(logger, "text")
+ if d := os.Getenv("ARVADOS_DEBUG"); d != "0" && d != "" {
+ setLevel(logger, "debug")
+ } else {
+ setLevel(logger, "info")
+ }
+ return logger
+}
+
// SetLevel sets the current logging level. See logrus for level
// names.
func SetLevel(level string) {
- lvl, err := logrus.ParseLevel(level)
- if err != nil {
- logrus.Fatal(err)
+ setLevel(rootLogger, level)
+}
+
+func setLevel(logger *logrus.Logger, level string) {
+ if level == "" {
+ } else if lvl, err := logrus.ParseLevel(level); err != nil {
+ logrus.WithField("Level", level).Fatal("unknown log level")
+ } else {
+ logger.Level = lvl
}
- rootLogger.Level = lvl
}
// SetFormat sets the current logging format to "json" or "text".
func SetFormat(format string) {
+ setFormat(rootLogger, format)
+}
+
+func setFormat(logger *logrus.Logger, format string) {
switch format {
case "text":
- rootLogger.Formatter = &logrus.TextFormatter{
+ logger.Formatter = &logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: rfc3339NanoFixed,
}
- case "json":
- rootLogger.Formatter = &logrus.JSONFormatter{
+ case "json", "":
+ logger.Formatter = &logrus.JSONFormatter{
TimestampFormat: rfc3339NanoFixed,
}
default:
- logrus.WithField("LogFormat", format).Fatal("unknown log format")
+ logrus.WithField("Format", format).Fatal("unknown log format")
}
}
+
+// logWriter is an io.Writer that writes by calling a "write log"
+// function, typically (*check.C)Log().
+type logWriter struct {
+ logfunc func(...interface{})
+}
+
+func (tl *logWriter) Write(buf []byte) (int, error) {
+ tl.logfunc(string(bytes.TrimRight(buf, "\n")))
+ return len(buf), nil
+}
}
}
arvadosRootUrl = "https://" + arvadosApiHost;
- arvadosRootUrl += (arvadosApiHost.endsWith("/")) ? "" : "/";
if (hostInsecure != null) {
arvadosApiHostInsecure = Boolean.valueOf(hostInsecure);
if streamoffset == current_span[1]:
current_span[1] += segment.segment_size
else:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+ stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
current_span = [streamoffset, streamoffset + segment.segment_size]
if current_span is not None:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+ stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
if not stream[streamfile]:
- stream_tokens.append("0:0:{0}".format(fout))
+ stream_tokens.append(u"0:0:{0}".format(fout))
return stream_tokens
streampath, filename = split(streampath)
if self._last_open and not self._last_open.closed:
raise errors.AssertionError(
- "can't open '{}' when '{}' is still open".format(
+ u"can't open '{}' when '{}' is still open".format(
filename, self._last_open.name))
if streampath != self.current_stream_name():
self.start_new_stream(streampath)
writer._queued_file.seek(pos)
except IOError as error:
raise errors.StaleWriterStateError(
- "failed to reopen active file {}: {}".format(path, error))
+ u"failed to reopen active file {}: {}".format(path, error))
return writer
def check_dependencies(self):
for path, orig_stat in listitems(self._dependencies):
if not S_ISREG(orig_stat[ST_MODE]):
- raise errors.StaleWriterStateError("{} not file".format(path))
+ raise errors.StaleWriterStateError(u"{} not file".format(path))
try:
now_stat = tuple(os.stat(path))
except OSError as error:
raise errors.StaleWriterStateError(
- "failed to stat {}: {}".format(path, error))
+ u"failed to stat {}: {}".format(path, error))
if ((not S_ISREG(now_stat[ST_MODE])) or
(orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
(orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
- raise errors.StaleWriterStateError("{} changed".format(path))
+ raise errors.StaleWriterStateError(u"{} changed".format(path))
def dump_state(self, copy_func=lambda x: x):
state = {attr: copy_func(getattr(self, attr))
try:
src_path = os.path.realpath(source)
except Exception:
- raise errors.AssertionError("{} not a file path".format(source))
+ raise errors.AssertionError(u"{} not a file path".format(source))
try:
path_stat = os.stat(src_path)
except OSError as stat_error:
self._dependencies[source] = tuple(fd_stat)
elif path_stat is None:
raise errors.AssertionError(
- "could not stat {}: {}".format(source, stat_error))
+ u"could not stat {}: {}".format(source, stat_error))
elif path_stat.st_ino != fd_stat.st_ino:
raise errors.AssertionError(
- "{} changed between open and stat calls".format(source))
+ u"{} changed between open and stat calls".format(source))
else:
self._dependencies[src_path] = tuple(fd_stat)
import json
import os
import re
-import subprocess32 as subprocess
import sys
import tarfile
import tempfile
import shutil
import _strptime
import fcntl
-
from operator import itemgetter
from stat import *
+if os.name == "posix" and sys.version_info[0] < 3:
+ import subprocess32 as subprocess
+else:
+ import subprocess
+
import arvados
import arvados.util
import arvados.commands._util as arv_cmd
try:
fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
- raise ResumeCacheConflict("{} locked".format(fileobj.name))
+ raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
def load(self):
self.cache_file.seek(0)
raise ArvPutUploadIsPending()
self._write_stdin(self.filename or 'stdin')
elif not os.path.exists(path):
- raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
+ raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
elif os.path.isdir(path):
# Use absolute paths on cache index so CWD doesn't interfere
# with the caching logic.
elif file_in_local_collection.permission_expired():
# Permission token expired, re-upload file. This will change whenever
# we have a API for refreshing tokens.
- self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
+ self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
should_upload = True
self._local_collection.remove(filename)
elif cached_file_data['size'] == file_in_local_collection.size():
# Inconsistent cache, re-upload the file
should_upload = True
self._local_collection.remove(filename)
- self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
# Local file differs from cached data, re-upload it.
else:
if file_in_local_collection:
if self.use_cache:
cache_filepath = self._get_cache_filepath()
if self.resume and os.path.exists(cache_filepath):
- self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
+ self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'a+')
else:
# --no-resume means start with a empty cache file.
- self.logger.info("Creating new cache file at {}".format(cache_filepath))
+ self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'w+')
self._cache_filename = self._cache_file.name
self._lock_file(self._cache_file)
try:
fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
- raise ResumeCacheConflict("{} locked".format(fileobj.name))
+ raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
def _save_state(self):
"""
else:
try:
if args.update_collection:
- logger.info("Collection updated: '{}'".format(writer.collection_name()))
+ logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
else:
- logger.info("Collection saved as '{}'".format(writer.collection_name()))
+ logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
if args.portable_data_hash:
output = writer.portable_data_hash()
else:
'ruamel.yaml >=0.15.54, <=0.15.77',
'setuptools',
'ws4py >=0.4.2',
- 'subprocess32 >=3.5.1',
],
+ extras_require={
+ ':os.name=="posix" and python_version<"3"': ['subprocess32 >= 3.5.1'],
+ },
classifiers=[
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 3',
+# -*- coding: utf-8 -*-
+
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
+ def test_unicode_on_filename(self):
+ tmpdir = self.make_tmpdir()
+ fname = u"i❤arvados.txt"
+ with open(os.path.join(tmpdir, fname), 'w') as f:
+ f.write("This is a unicode named file")
+ col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
+
def test_silent_mode_no_errors(self):
self.authorize_with('active')
tmpdir = self.make_tmpdir()
# Generated git-commit.version file
/git-commit.version
+
+# Generated when building distribution packages
+/package-build.version
clnt = HTTPClient.new
if Rails.configuration.sso_insecure
clnt.ssl_config.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ else
+ # Use system CA certificates
+ ["/etc/ssl/certs/ca-certificates.crt",
+ "/etc/pki/tls/certs/ca-bundle.crt"]
+ .select { |ca_path| File.readable?(ca_path) }
+ .each { |ca_path| clnt.ssl_config.add_trust_ca(ca_path) }
end
remote_user = SafeJSON.load(
clnt.get_content('https://' + host + '/arvados/v1/users/current',
sync_past_versions if syncable_updates.any?
if snapshot
snapshot.attributes = self.syncable_updates
+ snapshot.manifest_text = snapshot.signed_manifest_text
snapshot.save
end
end
transaction do
reload
check_lock_fail
- update_attributes!(state: Locked)
+ update_attributes!(state: Locked, lock_count: self.lock_count+1)
end
end
transaction do
reload(lock: 'FOR UPDATE')
check_unlock_fail
- update_attributes!(state: Queued)
+ if self.lock_count < Rails.configuration.max_container_dispatch_attempts
+ update_attributes!(state: Queued)
+ else
+ update_attributes!(state: Cancelled,
+ runtime_status: {
+ error: "Container exceeded 'max_container_dispatch_attempts' (lock_count=#{self.lock_count}."
+ })
+ end
end
end
else
kwargs = {}
end
+ if users_list.select { |u| u.is_admin }.any?
+ return super
+ end
Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").exists)
end
case self.state
when Locked
- permitted.push :priority, :runtime_status, :log
+ permitted.push :priority, :runtime_status, :log, :lock_count
when Queued
permitted.push :priority
when Running
permitted.push :finished_at, *progress_attrs
when Queued, Locked
- permitted.push :finished_at, :log
+ permitted.push :finished_at, :log, :runtime_status
end
else
# > 0 = auto-create a new version when older than the specified number of seconds.
preserve_version_if_idle: -1
+ # Number of times a container can be unlocked before being
+ # automatically cancelled.
+ max_container_dispatch_attempts: 5
+
development:
force_ssl: false
cache_classes: false
--- /dev/null
+class AddContainerLockCount < ActiveRecord::Migration
+ def change
+ add_column :containers, :lock_count, :int, :null => false, :default => 0
+ end
+end
runtime_status jsonb DEFAULT '{}'::jsonb,
runtime_user_uuid text,
runtime_auth_scopes jsonb,
- runtime_token text
+ runtime_token text,
+ lock_count integer DEFAULT 0 NOT NULL
);
INSERT INTO schema_migrations (version) VALUES ('20181213183234');
+INSERT INTO schema_migrations (version) VALUES ('20190214214814');
+
version: 42,
current_version_uuid: collections(:collection_owned_by_active).uuid,
manifest_text: manifest_text,
- # portable_data_hash: "d30fe8ae534397864cb96c544f4cf102+47"
}
}
assert_response :success
assert_equal 1, resp['version']
assert_equal resp['uuid'], resp['current_version_uuid']
end
+
+ test "update collection with versioning enabled" do
+ Rails.configuration.collection_versioning = true
+ Rails.configuration.preserve_version_if_idle = 1 # 1 second
+
+ col = collections(:collection_owned_by_active)
+ assert_equal 2, col.version
+ assert col.modified_at < Time.now - 1.second
+
+ token = api_client_authorizations(:active).v2token
+ signed = Blob.sign_locator(
+ 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+ key: Rails.configuration.blob_signing_key,
+ api_token: token)
+ authorize_with_token token
+ put :update, {
+ id: col.uuid,
+ collection: {
+ manifest_text: ". #{signed} 0:3:foo.txt\n",
+ },
+ }
+ assert_response :success
+ assert_equal 3, json_response['version']
+ end
end
assert_operator auth_exp, :<, db_current_time
end
+ test "Exceed maximum lock-unlock cycles" do
+ Rails.configuration.max_container_dispatch_attempts = 3
+
+ set_user_from_auth :active
+ c, cr = minimal_new
+
+ set_user_from_auth :dispatch1
+ assert_equal Container::Queued, c.state
+ assert_equal 0, c.lock_count
+
+ c.lock
+ c.reload
+ assert_equal 1, c.lock_count
+ assert_equal Container::Locked, c.state
+
+ c.unlock
+ c.reload
+ assert_equal 1, c.lock_count
+ assert_equal Container::Queued, c.state
+
+ c.lock
+ c.reload
+ assert_equal 2, c.lock_count
+ assert_equal Container::Locked, c.state
+
+ c.unlock
+ c.reload
+ assert_equal 2, c.lock_count
+ assert_equal Container::Queued, c.state
+
+ c.lock
+ c.reload
+ assert_equal 3, c.lock_count
+ assert_equal Container::Locked, c.state
+
+ c.unlock
+ c.reload
+ assert_equal 3, c.lock_count
+ assert_equal Container::Cancelled, c.state
+
+ assert_raise(ArvadosModel::LockFailedError) do
+ # Cancelled to Locked is not allowed
+ c.lock
+ end
+ end
+
test "Container queued cancel" do
set_user_from_auth :active
c, cr = minimal_new({container_count_max: 1})
assert_equal 1, Container.readable_by(users(:active)).where(state: "Queued").count
end
+ test "Containers with no matching request are readable by admin" do
+ uuids = Container.includes('container_requests').where(container_requests: {uuid: nil}).collect(&:uuid)
+ assert_not_empty uuids
+ assert_empty Container.readable_by(users(:active)).where(uuid: uuids)
+ assert_not_empty Container.readable_by(users(:admin)).where(uuid: uuids)
+ assert_equal uuids.count, Container.readable_by(users(:admin)).where(uuid: uuids).count
+ end
+
test "Container locked cancel" do
set_user_from_auth :active
c, _ = minimal_new
"encoding/json"
"fmt"
"io"
- "io/ioutil"
"os"
"os/exec"
"path/filepath"
// procinfo is saved in each process's lockfile.
type procinfo struct {
- UUID string
- PID int
- Stdout string
- Stderr string
+ UUID string
+ PID int
}
// Detach acquires a lock for the given uuid, and starts the current
// program as a child process (with -no-detach prepended to the given
// arguments so the child knows not to detach again). The lock is
// passed along to the child process.
+//
+// Stdout and stderr in the child process are sent to the systemd
+// journal using the systemd-cat program.
func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
return exitcode(stderr, detach(uuid, args, stdout, stderr))
}
return nil, err
}
defer dirlock.Close()
- lockfile, err := os.OpenFile(filepath.Join(lockdir, lockprefix+uuid+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+ lockfilename := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
+ lockfile, err := os.OpenFile(lockfilename, os.O_CREATE|os.O_RDWR, 0700)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("open %s: %s", lockfilename, err)
}
err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
lockfile.Close()
- return nil, err
+ return nil, fmt.Errorf("lock %s: %s", lockfilename, err)
}
return lockfile, nil
}()
defer lockfile.Close()
lockfile.Truncate(0)
- outfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stdout-")
- if err != nil {
- return err
- }
- defer outfile.Close()
- errfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stderr-")
- if err != nil {
- os.Remove(outfile.Name())
- return err
- }
- defer errfile.Close()
-
- cmd := exec.Command(args[0], append([]string{"-no-detach"}, args[1:]...)...)
- cmd.Stdout = outfile
- cmd.Stderr = errfile
+ cmd := exec.Command("systemd-cat", append([]string{"--identifier=crunch-run", args[0], "-no-detach"}, args[1:]...)...)
// Child inherits lockfile.
cmd.ExtraFiles = []*os.File{lockfile}
// Ensure child isn't interrupted even if we receive signals
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
err = cmd.Start()
if err != nil {
- os.Remove(outfile.Name())
- os.Remove(errfile.Name())
- return err
+ return fmt.Errorf("exec %s: %s", cmd.Path, err)
}
w := io.MultiWriter(stdout, lockfile)
- err = json.NewEncoder(w).Encode(procinfo{
- UUID: uuid,
- PID: cmd.Process.Pid,
- Stdout: outfile.Name(),
- Stderr: errfile.Name(),
+ return json.NewEncoder(w).Encode(procinfo{
+ UUID: uuid,
+ PID: cmd.Process.Pid,
})
- if err != nil {
- os.Remove(outfile.Name())
- os.Remove(errfile.Name())
- return err
- }
- return nil
}
// KillProcess finds the crunch-run process corresponding to the given
if os.IsNotExist(err) {
return nil
} else if err != nil {
- return err
+ return fmt.Errorf("open %s: %s", path, err)
}
defer f.Close()
var pi procinfo
err = json.NewDecoder(f).Decode(&pi)
if err != nil {
- return fmt.Errorf("%s: %s\n", path, err)
+ return fmt.Errorf("decode %s: %s\n", path, err)
}
if pi.UUID != uuid || pi.PID == 0 {
proc, err := os.FindProcess(pi.PID)
if err != nil {
- return err
+ return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
}
err = proc.Signal(signal)
err = proc.Signal(syscall.Signal(0))
}
if err == nil {
- return fmt.Errorf("pid %d: sent signal %d (%s) but process is still alive", pi.PID, signal, signal)
+ return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
}
- fmt.Fprintf(stderr, "pid %d: %s\n", pi.PID, err)
+ fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
return nil
}
// List UUIDs of active crunch-run processes.
func ListProcesses(stdout, stderr io.Writer) int {
- return exitcode(stderr, filepath.Walk(lockdir, func(path string, info os.FileInfo, err error) error {
- if info.IsDir() {
+ // filepath.Walk does not follow symlinks, so we must walk
+ // lockdir+"/." in case lockdir itself is a symlink.
+ walkdir := lockdir + "/."
+ return exitcode(stderr, filepath.Walk(walkdir, func(path string, info os.FileInfo, err error) error {
+ if info.IsDir() && path != walkdir {
return filepath.SkipDir
}
if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
err := os.Remove(path)
dirlock.Close()
if err != nil {
- fmt.Fprintln(stderr, err)
+ fmt.Fprintf(stderr, "unlink %s: %s\n", f.Name(), err)
}
return nil
}
//
// Caller releases the lock by closing the returned file.
func lockall() (*os.File, error) {
- f, err := os.OpenFile(filepath.Join(lockdir, lockprefix+"all"+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+ lockfile := filepath.Join(lockdir, lockprefix+"all"+locksuffix)
+ f, err := os.OpenFile(lockfile, os.O_CREATE|os.O_RDWR, 0700)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("open %s: %s", lockfile, err)
}
err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
if err != nil {
f.Close()
- return nil, err
+ return nil, fmt.Errorf("lock %s: %s", lockfile, err)
}
return f, nil
}
cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+ stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
flag.Parse()
+ if *stdinEnv && !ignoreDetachFlag {
+ // Load env vars on stdin if asked (but not in a
+ // detached child process, in which case stdin is
+ // /dev/null).
+ loadEnv(os.Stdin)
+ }
+
switch {
case *detach && !ignoreDetachFlag:
os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
log.Fatalf("%s: %v", containerId, runerr)
}
}
+
+func loadEnv(rdr io.Reader) {
+ buf, err := ioutil.ReadAll(rdr)
+ if err != nil {
+ log.Fatalf("read stdin: %s", err)
+ }
+ var env map[string]string
+ err = json.Unmarshal(buf, &env)
+ if err != nil {
+ log.Fatalf("decode stdin: %s", err)
+ }
+ for k, v := range env {
+ err = os.Setenv(k, v)
+ if err != nil {
+ log.Fatalf("setenv(%q): %s", k, err)
+ }
+ }
+}
switch {
case err == nil:
return err
+ case strings.Contains(err.Error(), "StatusCode=503"):
+ // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
+ return VolumeBusyError
case strings.Contains(err.Error(), "Not Found"):
// "storage: service returned without a response body (404 Not Found)"
return os.ErrNotExist
// - permissions on, authenticated request, unsigned locator
// - permissions on, unauthenticated request, signed locator
// - permissions on, authenticated request, expired locator
+// - permissions on, authenticated request, signed locator, transient error from backend
//
func TestGetHandler(t *testing.T) {
defer teardown()
ExpectStatusCode(t,
"Authenticated request, expired locator",
ExpiredError.HTTPCode, response)
+
+ // Authenticated request, signed locator
+ // => 503 Server busy (transient error)
+
+ // Set up the block owning volume to respond with errors
+ vols[0].(*MockVolume).Bad = true
+ vols[0].(*MockVolume).BadVolumeError = VolumeBusyError
+ response = IssueRequest(&RequestTester{
+ method: "GET",
+ uri: signedLocator,
+ apiToken: knownToken,
+ })
+ // A transient error from one volume while the other doesn't find the block
+ // should make the service return a 503 so that clients can retry.
+ ExpectStatusCode(t,
+ "Volume backend busy",
+ 503, response)
}
// Test PutBlockHandler on the following situations:
"sync"
"time"
- "github.com/gorilla/mux"
-
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/gorilla/mux"
)
type router struct {
if !os.IsNotExist(err) {
log.Printf("%s: Get(%s): %s", vol, hash, err)
}
+ // If some volume returns a transient error, return it to the caller
+ // instead of "Not found" so it can retry.
+ if err == VolumeBusyError {
+ errorToCaller = err.(*KeepError)
+ }
continue
}
// Check the file checksum.
DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
ExpiredError = &KeepError{401, "Expired permission signature"}
NotFoundError = &KeepError{404, "Not Found"}
+ VolumeBusyError = &KeepError{503, "Volume backend busy"}
GenericError = &KeepError{500, "Fail"}
FullError = &KeepError{503, "Full"}
SizeRequiredError = &KeepError{411, "Missing Content-Length"}
import (
"bytes"
"context"
+ "errors"
"fmt"
"io/ioutil"
"os"
vols := KeepVM.AllWritable()
vols[0].(*MockVolume).Bad = true
+ vols[0].(*MockVolume).BadVolumeError = errors.New("Bad volume")
// Check that PutBlock stores the data as expected.
if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
Timestamps map[string]time.Time
// Bad volumes return an error for every operation.
- Bad bool
+ Bad bool
+ BadVolumeError error
// Touchable volumes' Touch() method succeeds for a locator
// that has been Put().
v.gotCall("Compare")
<-v.Gate
if v.Bad {
- return errors.New("Bad volume")
+ return v.BadVolumeError
} else if block, ok := v.Store[loc]; ok {
if fmt.Sprintf("%x", md5.Sum(block)) != loc {
return DiskHashError
v.gotCall("Get")
<-v.Gate
if v.Bad {
- return 0, errors.New("Bad volume")
+ return 0, v.BadVolumeError
} else if block, ok := v.Store[loc]; ok {
copy(buf[:len(block)], block)
return len(block), nil
v.gotCall("Put")
<-v.Gate
if v.Bad {
- return errors.New("Bad volume")
+ return v.BadVolumeError
}
if v.Readonly {
return MethodDisabledError
var mtime time.Time
var err error
if v.Bad {
- err = errors.New("Bad volume")
+ err = v.BadVolumeError
} else if t, ok := v.Timestamps[loc]; ok {
mtime = t
} else {
s.files = ["bin/arvados-login-sync", "agpl-3.0.txt"]
s.executables << "arvados-login-sync"
s.required_ruby_version = '>= 2.1.0'
- s.add_runtime_dependency 'arvados', '~> 1.2.0', '>= 1.2.0'
+ s.add_runtime_dependency 'arvados', '~> 1.3.0', '>= 1.3.0'
s.homepage =
'https://arvados.org'
end
permChecker permChecker
subscriptions []v0subscribe
lastMsgID uint64
- log *logrus.Entry
+ log logrus.FieldLogger
mtx sync.Mutex
setupOnce sync.Once
}
arvados_docsite: http://$localip:${services[doc]}/
force_ssl: false
composer_url: http://$localip:${services[composer]}
+ workbench2_url: https://$localip:${services[workbench2-ssl]}
EOF
bundle exec rake assets:precompile
"revision": "b8bc1bf767474819792c23f32d8286a45736f1c6",
"revisionTime": "2016-12-03T19:45:07Z"
},
- {
- "checksumSHA1": "ewGq4nGalpCQOHcmBTdAEQx1wW0=",
- "path": "github.com/mitchellh/mapstructure",
- "revision": "bb74f1db0675b241733089d5a1faa5dd8b0ef57b",
- "revisionTime": "2018-05-11T14:21:26Z"
- },
{
"checksumSHA1": "OFNit1Qx2DdWhotfREKodDNUwCM=",
"path": "github.com/opencontainers/go-digest",