Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>
doc/fonts/*
doc/user/cwl/federated/*
*/docker_image
-docker/jobs/apt.arvados.org.list
+docker/jobs/apt.arvados.org*.list
+docker/jobs/1078ECD7.key
*/en.bootstrap.yml
*font-awesome.css
*.gif
# npm-rails
/node_modules
/npm-debug.log
+
+# Generated when building distribution packages
+/package-build.version
</form>
</li>
<% end %>
+ <% if Rails.configuration.workbench2_url %>
+ <li role="menuitem">
+ <%
+ wb2_url = Rails.configuration.workbench2_url
+ wb2_url += '/' if wb2_url[-1] != '/'
+ wb2_url += 'token'
+ %>
+ <form action="<%= wb2_url %>" method="GET">
+ <input type="hidden" name="api_token" value="<%= Thread.current[:arvados_api_token] %>">
+ <button role="menuitem" type="submit">
+ <i class="fa fa-lg fa-share-square fa-fw"></i> Go to Workbench 2
+ </button>
+ </form>
+ </li>
+ <% end %>
<li role="menuitem">
<%= link_to virtual_machines_user_path(current_user), role: 'menu-item' do %>
<i class="fa fa-lg fa-terminal fa-fw"></i> Virtual machines
# the jobs api is disabled and there are no local git repositories.
#
repositories: true
+
+ #
+ # Add an item to the user menu pointing to workbench2_url, if not false.
+ #
+ # Example:
+ # workbench2_url: https://workbench2.qr1hi.arvadosapi.com
+ #
+ workbench2_url: false
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+include ConfigValidators
+
+ConfigValidators::validate_wb2_url_config()
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require 'uri'
+
+module ConfigValidators
+ def validate_wb2_url_config
+ if Rails.configuration.workbench2_url
+ begin
+ if !URI.parse(Rails.configuration.workbench2_url).is_a?(URI::HTTP)
+ Rails.logger.warn("workbench2_url config is not an HTTP URL: #{Rails.configuration.workbench2_url}")
+ Rails.configuration.workbench2_url = false
+ elsif /.*[\/]{2,}$/.match(Rails.configuration.workbench2_url)
+ Rails.logger.warn("workbench2_url config shouldn't have multiple trailing slashes: #{Rails.configuration.workbench2_url}")
+ Rails.configuration.workbench2_url = false
+ else
+ return true
+ end
+ rescue URI::InvalidURIError
+ Rails.logger.warn("workbench2_url config invalid URL: #{Rails.configuration.workbench2_url}")
+ Rails.configuration.workbench2_url = false
+ end
+ end
+ return false
+ end
+end
+
+++ /dev/null
-1.2.1.20181126194329
end
end
+ [
+ [false, false],
+ ['http://wb2.example.org//', false],
+ ['ftp://wb2.example.org', false],
+ ['wb2.example.org', false],
+ ['http://wb2.example.org', true],
+ ['https://wb2.example.org', true],
+ ['http://wb2.example.org/', true],
+ ['https://wb2.example.org/', true],
+ ].each do |wb2_url_config, wb2_menu_appear|
+ test "workbench2_url=#{wb2_url_config} should#{wb2_menu_appear ? '' : ' not'} show WB2 menu" do
+ Rails.configuration.workbench2_url = wb2_url_config
+ assert_equal wb2_menu_appear, ConfigValidators::validate_wb2_url_config()
+
+ visit page_with_token('active')
+ within('.navbar-fixed-top') do
+ page.find("#notifications-menu").click
+ within('.dropdown-menu') do
+ assert_equal wb2_menu_appear, page.has_text?('Go to Workbench 2')
+ end
+ end
+ end
+ end
+
[
['active', true],
['active_with_prefs_profile_no_getting_started_shown', false],
+++ /dev/null
-+ echo -n 'geckodriver: '
-+ which geckodriver || fatal "No geckodriver. Unable to find Mozilla geckodriver. Please download the server from https://github.com/mozilla/geckodriver/releases and place it somewhere on your PATH. More info at https://developer.mozilla.org/en-US/docs/Mozilla/QA/Marionette/WebDriver."
-
RUN touch /var/lib/rpm/* && yum -q -y install rh-python35
RUN scl enable rh-python35 "easy_install-3.5 pip" && easy_install-2.7 pip
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
# Add epel, we need it for the python-pam dependency
RUN wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
RUN rpm -ivh epel-release-latest-7.noarch.rpm
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
+# The version of setuptools that comes with CentOS is way too old
+RUN pip install --upgrade setuptools
+
ENV WORKSPACE /arvados
CMD ["scl", "enable", "rh-python35", "/usr/local/rvm/bin/rvm-exec default bash /jenkins/run-build-packages.sh --target centos7"]
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
# Install virtualenv
RUN /usr/bin/pip install virtualenv
ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
ENV WORKSPACE /arvados
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip python3-venv python3-dev
# Install virtualenv
RUN /usr/bin/pip install virtualenv
ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
ENV WORKSPACE /arvados
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip python3.4-venv python3.4-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip python3.4-venv python3.4-dev
# Install virtualenv
RUN /usr/bin/pip install virtualenv
ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
ENV WORKSPACE /arvados
ENV DEBIAN_FRONTEND noninteractive
# Install dependencies.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev libgnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip tzdata python3-venv python3-dev
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools python3-pip libcurl4-gnutls-dev libgnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip tzdata python3-venv python3-dev
# Install virtualenv
RUN /usr/bin/pip install virtualenv
ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
ENV WORKSPACE /arvados
ADD generated/node-v6.11.2-linux-x64.tar.xz /usr/local/
RUN ln -s /usr/local/node-v6.11.2-linux-x64/bin/* /usr/local/bin/
-# Old versions of setuptools cannot build a schema-salad package.
-RUN pip install --upgrade setuptools
-
RUN git clone --depth 1 git://git.curoverse.com/arvados.git /tmp/arvados && cd /tmp/arvados/services/api && /usr/local/rvm/bin/rvm-exec default bundle && cd /tmp/arvados/apps/workbench && /usr/local/rvm/bin/rvm-exec default bundle && rm -rf /tmp/arvados
ENV WORKSPACE /arvados
function usage {
echo >&2
- echo >&2 "usage: $0 [options]"
+ echo >&2 "usage: WORKSPACE=/path/to/arvados $0 [options]"
echo >&2
echo >&2 "$0 options:"
echo >&2 " -t, --tags version tag for docker"
+ echo >&2 " -r, --repo Arvados package repot to use: dev, testing, stable (default: dev)"
echo >&2 " -u, --upload Upload the images (docker push)"
echo >&2 " --no-cache Don't use build cache"
echo >&2 " -h, --help Display this help and exit"
echo >&2
- echo >&2 " If no options are given, just builds the images."
+ echo >&2 " WORKSPACE=path Path to the Arvados source tree to build from"
+ echo >&2
}
upload=false
+REPO=dev
# NOTE: This requires GNU getopt (part of the util-linux package on Debian-based distros).
-TEMP=`getopt -o hut: \
- --long help,upload,no-cache,tags: \
+TEMP=`getopt -o hut:r: \
+ --long help,upload,no-cache,tags,repo: \
-n "$0" -- "$@"`
if [ $? != 0 ] ; then echo "Use -h for help"; exit 1 ; fi
;;
esac
;;
+ -r | --repo)
+ case "$2" in
+ "")
+ echo "ERROR: --repo needs a parameter";
+ usage;
+ exit 1
+ ;;
+ *)
+ REPO="$2";
+ shift 2
+ ;;
+ esac
+ ;;
--)
shift
break
exit $EXITCODE
}
+# Sanity check
+if ! [[ -n "$WORKSPACE" ]]; then
+ usage;
+ echo >&2 "Error: WORKSPACE environment variable not set"
+ echo >&2
+ exit 1
+fi
+
+echo $WORKSPACE
+
COLUMNS=80
. $WORKSPACE/build/run-library.sh
checkexit $ECODE "docker push $*"
}
-# Sanity check
-if ! [[ -n "$WORKSPACE" ]]; then
- echo >&2
- echo >&2 "Error: WORKSPACE environment variable not set"
- echo >&2
- exit 1
-fi
-
-echo $WORKSPACE
-
# find the docker binary
DOCKER=`which docker.io`
docker build $NOCACHE \
--build-arg python_sdk_version=${python_sdk_version} \
--build-arg cwl_runner_version=${cwl_runner_version} \
+ --build-arg repo_version=${REPO} \
-t arvados/jobs:$cwl_runner_version_orig .
ECODE=$?
# -f flag removed in Docker 1.12
FORCE=-f
fi
+
+#docker export arvados/jobs:$cwl_runner_version_orig | docker import - arvados/jobs:$cwl_runner_version_orig
+
if ! [[ -z "$version_tag" ]]; then
docker tag $FORCE arvados/jobs:$cwl_runner_version_orig arvados/jobs:"$version_tag"
else
DASHQ_UNLESS_DEBUG=
fi
-EASY_INSTALL2=$(find_easy_install -$PYTHON2_VERSION "")
-EASY_INSTALL3=$(find_easy_install -$PYTHON3_VERSION 3)
-
RUN_BUILD_PACKAGES_PATH="`dirname \"$0\"`"
RUN_BUILD_PACKAGES_PATH="`( cd \"$RUN_BUILD_PACKAGES_PATH\" && pwd )`" # absolutized and normalized
if [ -z "$RUN_BUILD_PACKAGES_PATH" ] ; then
# Python packages
debug_echo -e "\nPython packages\n"
-cd "$WORKSPACE/sdk/pam"
-handle_python_package
-
-cd "$WORKSPACE/sdk/python"
-handle_python_package
-
-cd "$WORKSPACE/sdk/cwl"
-handle_python_package
-
-cd "$WORKSPACE/services/fuse"
-handle_python_package
-
-cd "$WORKSPACE/services/nodemanager"
-handle_python_package
-
# arvados-src
(
cd "$WORKSPACE"
echo ${repo_pkg_list} |grep -q ${complete_pkgname}
if [ $? -eq 0 ] ; then
echo "Package $complete_pkgname exists, not rebuilding!"
- curl -o ./${complete_pkgname} http://apt.arvados.org/pool/${D}/main/${repo_subdir}/${complete_pkgname}
+ curl -s -o ./${complete_pkgname} http://apt.arvados.org/pool/${D}/main/${repo_subdir}/${complete_pkgname}
return 1
elif test -f "$WORKSPACE/packages/$TARGET/processed/${complete_pkgname}" ; then
echo "Package $complete_pkgname exists, not rebuilding!"
else
centos_repo="http://rpm.arvados.org/CentOS/7/dev/x86_64/"
- repo_pkg_list=$(curl -o - ${centos_repo})
+ repo_pkg_list=$(curl -s -o - ${centos_repo})
echo ${repo_pkg_list} |grep -q ${complete_pkgname}
if [ $? -eq 0 ]; then
echo "Package $complete_pkgname exists, not rebuilding!"
- curl -o ./${complete_pkgname} ${centos_repo}${complete_pkgname}
+ curl -s -o ./${complete_pkgname} ${centos_repo}${complete_pkgname}
return 1
elif test -f "$WORKSPACE/packages/$TARGET/processed/${complete_pkgname}" ; then
echo "Package $complete_pkgname exists, not rebuilding!"
local python=""
case "$PACKAGE_TYPE" in
+ python3)
+ python=python3
+ if [[ "$FORMAT" != "rpm" ]]; then
+ pip=pip3
+ else
+ # In CentOS, we use a different mechanism to get the right version of pip
+ pip=pip
+ fi
+ PACKAGE_PREFIX=$PYTHON3_PKG_PREFIX
+ ;;
python)
# All Arvados Python2 packages depend on Python 2.7.
# Make sure we build with that for consistency.
python=python2.7
+ pip=pip
PACKAGE_PREFIX=$PYTHON2_PKG_PREFIX
;;
- python3)
- PACKAGE_PREFIX=$PYTHON3_PKG_PREFIX
- python=python3
- ;;
esac
if [[ "$PKG" != "libpam-arvados" ]] &&
rm -rf dist/*
+ # Get the latest setuptools
+ if ! $pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools; then
+ echo "Error, unable to upgrade setuptools with"
+ echo " $pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools"
+ exit 1
+ fi
if ! $python setup.py $DASHQ_UNLESS_DEBUG sdist; then
- echo "Error, unable to run python setup.py sdist for $PKG"
+ echo "Error, unable to run $python setup.py sdist for $PKG"
exit 1
fi
exit 1
fi
- if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip; then
+ if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip; then
echo "Error, unable to upgrade pip with"
- echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip"
+ echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip"
+ exit 1
+ fi
+ if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools; then
+ echo "Error, unable to upgrade setuptools with"
+ echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools"
exit 1
fi
- if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel; then
+ if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel; then
echo "Error, unable to upgrade wheel with"
- echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel"
+ echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel"
exit 1
fi
if [[ "$TARGET" != "centos7" ]] || [[ "$PYTHON_PKG" != "python-arvados-fuse" ]]; then
- build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
+ build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
else
# centos7 needs these special tweaks to install python-arvados-fuse
- build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG docutils
- PYCURL_SSL_LIBRARY=nss build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
+ build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG docutils
+ PYCURL_SSL_LIBRARY=nss build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
fi
if [[ "$?" != "0" ]]; then
echo "Error, unable to run"
- echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH"
+ echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH"
exit 1
fi
(
set -e
mkdir -p "$GOPATH/src/git.curoverse.com"
- rmdir -v --parents --ignore-fail-on-non-empty "${temp}/GOPATH"
if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
for d in \
"$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
# SPDX-License-Identifier: AGPL-3.0
[Unit]
-Description=Arvados cloud dispatch
+Description=arvados-dispatch-cloud
Documentation=https://doc.arvados.org/
After=network.target
AssertPathExists=/etc/arvados/config.yml
In order to start workflows from workbench, there needs to be Docker image tagged @arvados/jobs:latest@. The following command downloads the latest arvados/jobs image from Docker Hub, loads it into Keep, and tags it as 'latest'. In this example @$project_uuid@ should be the the UUID of the "Arvados Standard Docker Images" project.
<notextile>
-<pre><code>~$ <span class="userinput">arv-keepdocker --project-uuid $project_uuid --pull arvados/jobs latest</span>
+<pre><code>~$ <span class="userinput">arv-keepdocker --pull arvados/jobs latest --project-uuid $project_uuid</span>
</code></pre></notextile>
If the image needs to be downloaded from Docker Hub, the command can take a few minutes to complete, depending on available network bandwidth.
h3. current master branch
+h4. Stricter collection manifest validation on the API server
+
+As a consequence of "#14482":https://dev.arvados.org/issues/14482, the Ruby SDK does a more rigorous collection manifest validation. Collections created after 2015-05 are unlikely to be invalid, however you may check for invalid manifests using the script below.
+
+You could set up a new rvm gemset and install the specific arvados gem for testing, like so:
+
+<notextile>
+<pre><code>~$ <span class="userinput">rvm gemset create rubysdk-test</span>
+~$ <span class="userinput">rvm gemset use rubysdk-test</span>
+~$ <span class="userinput">gem install arvados -v 1.3.1.20190301212059</span>
+</code></pre>
+</notextile>
+
+Next, you can run the following script using admin credentials, it will scan the whole collection database and report any collection that didn't pass the check:
+
+{% codeblock as ruby %}
+require 'arvados'
+require 'arvados/keep'
+
+api = Arvados.new
+offset = 0
+batch_size = 100
+invalid = []
+
+while true
+ begin
+ req = api.collection.index(
+ :select => [:uuid, :created_at, :manifest_text],
+ :include_trash => true, :include_old_versions => true,
+ :limit => batch_size, :offset => offset)
+ rescue
+ invalid.each {|c| puts "#{c[:uuid]} (Created at #{c[:created_at]}): #{c[:error]}" }
+ raise
+ end
+
+ req[:items].each do |col|
+ begin
+ Keep::Manifest.validate! col[:manifest_text]
+ rescue Exception => e
+ puts "Collection #{col[:uuid]} manifest not valid"
+ invalid << {uuid: col[:uuid], error: e, created_at: col[:created_at]}
+ end
+ end
+ puts "Checked #{offset} / #{req[:items_available]} - Invalid: #{invalid.size}"
+ offset += req[:limit]
+ break if offset > req[:items_available]
+end
+
+if invalid.empty?
+ puts "No invalid collection manifests found"
+else
+ invalid.each {|c| puts "#{c[:uuid]} (Created at #{c[:created_at]}): #{c[:error]}" }
+end
+{% endcodeblock %}
+
+The script will return a final report enumerating any invalid collection by UUID, with its creation date and error message so you can take the proper correction measures, if needed.
+
h4. Python packaging change
As part of story "#9945":https://dev.arvados.org/issues/9945, the distribution packaging (deb/rpm) of our Python packages has changed. These packages now include a built-in virtualenv to reduce dependencies on system packages. We have also stopped packaging and publishing backports for all the Python dependencies of our packages, as they are no longer needed.
|Debian 9 ("stretch")|Supported|Latest|
|Ubuntu 14.04 ("trusty")|Supported|Latest|
|Ubuntu 16.04 ("xenial")|Supported|Latest|
+|Ubuntu 18.04 ("bionic")|Supported|Latest|
|Ubuntu 12.04 ("precise")|EOL|8ed7b6dd5d4df93a3f37096afe6d6f81c2a7ef6e (2017-05-03)|
|Debian 7 ("wheezy")|EOL|997479d1408139e96ecdb42a60b4f727f814f6c9 (2016-12-28)|
|CentOS 6 |EOL|997479d1408139e96ecdb42a60b4f727f814f6c9 (2016-12-28)|
h3. Debian and Ubuntu
-Packages are available for Debian 8 ("jessie"), Debian 9 ("stretch"), Ubuntu 14.04 ("trusty") and Ubuntu 16.04 ("xenial").
+Packages are available for Debian 8 ("jessie"), Debian 9 ("stretch"), Ubuntu 14.04 ("trusty"), Ubuntu 16.04 ("xenial") and Ubuntu 18.04 ("bionic").
First, register the Curoverse signing key in apt's database:
|Debian 9 ("stretch")|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ stretch main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
|Ubuntu 14.04 ("trusty")[1]|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ trusty main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
|Ubuntu 16.04 ("xenial")[1]|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ xenial main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
+|Ubuntu 18.04 ("bionic")[1]|<notextile><code><span class="userinput">echo "deb http://apt.arvados.org/ bionic main" | sudo tee /etc/apt/sources.list.d/arvados.list</span></code></notextile>|
{% include 'notebox_begin' %}
Docker images are subject to normal Arvados permissions. If wish to share your Docker image with others (or wish to share a pipeline template that uses your Docker image) you will need to use @arv-keepdocker@ with the @--project-uuid@ option to upload the image to a shared project.
<notextile>
-<pre><code>$ <span class="userinput">arv-keepdocker --project-uuid qr1hi-j7d0g-xxxxxxxxxxxxxxx arvados/jobs-with-r</span>
+<pre><code>$ <span class="userinput">arv-keepdocker arvados/jobs-with-r --project-uuid qr1hi-j7d0g-xxxxxxxxxxxxxxx</span>
</code></pre>
</notextile>
--- /dev/null
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+
+mQENBEzhgeoBCAChhoK1dqpWzNyDWqRGEvdFdkJaA9D2HRwKPfBfjAoePX6ZyrpA
+ItlUsvt/8s/DRiTiPEFQR4S7VqocmU6whJc3gDEGyOM6b1NF873lIfSVwUoE42QE
+a76dO8woOYgLUyxu2mKG+bJgGMumjBJt6ZOndYVjTYB/7sEeVxwmMVulfZe0s6zg
+ut0+SoTYg2R36qIqeIcWllYt97sEYnyy1qXMis4/3IZnuWkS/frsPR3aeUI4W+o2
+NDN1kj49+LMe7Fb5b7jZY08rZbAWXi1rU1hQx4jC9RvYqlT4HNld4Bn7os1IvOOA
+wNiR0oiVdiuDbBxcMvRPktxMrFVjowusRLq/ABEBAAG0PUN1cm92ZXJzZSwgSW5j
+IEF1dG9tYXRpYyBTaWduaW5nIEtleSA8c3lzYWRtaW5AY3Vyb3ZlcnNlLmNvbT6J
+ATgEEwECACIFAlNgYIECGwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEFcW
+WREQeOzXPkEH/jQJDIYI1dxWcYiA+hczmpaZvN2/pc/kwIW/6a03+6zqmSNkebOE
+TgoDILacSYc17hy20R1/rWyUstOMKcEgFDBlSehhHyl0f7q/w7d8Ais6MabzsPfx
+IceJpsjUg87+BR7qWhgQ0sxmtIF2TKuTFLs+nkGsgSsiBOEF4NvHxuj3HD4y8F27
+HNqrkqwjLS8xJwwH5Gp2uMEVr1AXIH3iSRjJ8X124s8iEP97Q/3IazoYRf9/MCSm
+QEx8KzxwDX6t4bW6O4D01K+e9gdkTY70dcMgJoqm5IsX7yxjEubiOunphtlJnZ9d
+Oi1yBN5UM3pWKAdcfRj4rcfV9Simvpx9av+5AQ0ETOGB6gEIAMAA0HVMG0BbdnU7
+wWgl5eFdT0AUSrXK/WdcKqVEGGv+c68NETSHWZOJX7O46Eao4gY4cTYprVMBzxpY
+/BtQSYLpE0HLvBc1fcFd61Yz4H/9rGSNY0GcIQEbOjbJY5mr8qFsQ1K/mAf3aUL3
+b6ni4sHVicRiRr0Gl4Ihorlskpfu1SHs/C5tvTSVNF9p4vtl5892y1yILQeVpcBs
+NCR7MUpdS49xCpvnAWsDZX+ij6LTR3lzCm/ZLCg4gNuZkjgU9oqVfGkqysW7WZ8S
+OLvzAwUw7i1EIFX8q6QdudGoezxz8m8OgZM1v8AFpYEKlhEPf1W0MSfaRDwrj866
+8nCLruEAEQEAAYkBHwQYAQIACQUCTOGB6gIbDAAKCRBXFlkREHjs199EB/4+p0G1
+3PHxt6rLWSCGXobDOu4ZOA/qnv0D/JhOLroFds5TzQv6vnS8eAkhCTjHVA+b58cm
+kXpI0oYcD4ZP+KK1CHKq2rGfwou7HfAF+icnNqYkeBOkjjbCgkvBlcCInuAuU8JX
+DZMkfFk52+eBKwTjS/J/fQp0vDru8bHLp98WgdRHWfJQ3mc3gz4A5sR6zhrGPW6/
+ssnROS4dC2Ohp35GpgN1KjD3EmEw5RoSBYlyrARCaMsivgIKMxGUEyFZWhuJt3N1
+2MTddRwz28hbmYCi+MzHYDbRv+cSyUDmvXaWhfkNKBepClBA1rTWBcldit5vvlqr
+yPet6wIKrtLGhAqZ
+=CLkG
+-----END PGP PUBLIC KEY BLOCK-----
#
# SPDX-License-Identifier: Apache-2.0
-# Based on Debian Jessie
-FROM debian:jessie
-MAINTAINER Ward Vandewege <ward@curoverse.com>
+# Based on Debian Stretch
+FROM debian:stretch
+MAINTAINER Ward Vandewege <wvandewege@veritasgenetics.com>
ENV DEBIAN_FRONTEND noninteractive
-ADD apt.arvados.org.list /etc/apt/sources.list.d/
-RUN apt-key adv --keyserver pool.sks-keyservers.net --recv 1078ECD7
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3
+RUN apt-get update -q
+RUN apt-get install -yq --no-install-recommends gnupg
+
+ARG repo_version
+RUN echo repo_version $repo_version
+ADD apt.arvados.org-$repo_version.list /etc/apt/sources.list.d/
+
+ADD 1078ECD7.key /tmp/
+RUN cat /tmp/1078ECD7.key | apt-key add -
ARG python_sdk_version
ARG cwl_runner_version
RUN echo cwl_runner_version $cwl_runner_version python_sdk_version $python_sdk_version
RUN apt-get update -q
-RUN apt-get install -yq --no-install-recommends \
- git python-pip python-virtualenv \
- python-dev libgnutls28-dev libcurl4-gnutls-dev nodejs \
+RUN apt-get install -yq --no-install-recommends nodejs \
python-arvados-python-client=$python_sdk_version \
python-arvados-cwl-runner=$cwl_runner_version
-# use the Python executable from the python-arvados-python-client package
-RUN rm -f /usr/bin/python && ln -s /usr/share/python2.7/dist/python-arvados-python-client/bin/python /usr/bin/python
+# use the Python executable from the python-arvados-cwl-runner package
+RUN rm -f /usr/bin/python && ln -s /usr/share/python2.7/dist/python-arvados-cwl-runner/bin/python /usr/bin/python
# Install dependencies and set up system.
RUN /usr/sbin/adduser --disabled-password \
--- /dev/null
+# apt.arvados.org
+deb http://apt.arvados.org/ stretch-dev main
--- /dev/null
+# apt.arvados.org
+deb http://apt.arvados.org/ stretch main
--- /dev/null
+# apt.arvados.org
+deb http://apt.arvados.org/ stretch-testing main
+++ /dev/null
-# apt.arvados.org
-deb http://apt.arvados.org/ jessie-dev main
StorageAccount string
BlobContainer string
DeleteDanglingResourcesAfter arvados.Duration
+ AdminUsername string
+}
+
+const tagKeyInstanceSecret = "InstanceSecret"
+
+type containerWrapper interface {
+ GetBlobReference(name string) *storage.Blob
+ ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
}
type virtualMachinesClientWrapper interface {
}
type azureInstanceSet struct {
- azconfig azureInstanceSetConfig
- vmClient virtualMachinesClientWrapper
- netClient interfacesClientWrapper
- storageAcctClient storageacct.AccountsClient
- azureEnv azure.Environment
- interfaces map[string]network.Interface
- dispatcherID string
- namePrefix string
- ctx context.Context
- stopFunc context.CancelFunc
- stopWg sync.WaitGroup
- deleteNIC chan string
- deleteBlob chan storage.Blob
- logger logrus.FieldLogger
+ azconfig azureInstanceSetConfig
+ vmClient virtualMachinesClientWrapper
+ netClient interfacesClientWrapper
+ blobcont containerWrapper
+ azureEnv azure.Environment
+ interfaces map[string]network.Interface
+ dispatcherID string
+ namePrefix string
+ ctx context.Context
+ stopFunc context.CancelFunc
+ stopWg sync.WaitGroup
+ deleteNIC chan string
+ deleteBlob chan storage.Blob
+ logger logrus.FieldLogger
}
func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
return nil, err
}
- ap := azureInstanceSet{logger: logger}
- err = ap.setup(azcfg, string(dispatcherID))
+ az := azureInstanceSet{logger: logger}
+ az.ctx, az.stopFunc = context.WithCancel(context.Background())
+ err = az.setup(azcfg, string(dispatcherID))
if err != nil {
+ az.stopFunc()
return nil, err
}
- return &ap, nil
+ return &az, nil
}
func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
az.vmClient = &virtualMachinesClientImpl{vmClient}
az.netClient = &interfacesClientImpl{netClient}
- az.storageAcctClient = storageAcctClient
+
+ result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
+ if err != nil {
+ az.logger.WithError(err).Warn("Couldn't get account keys")
+ return err
+ }
+
+ key1 := *(*result.Keys)[0].Value
+ client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
+ if err != nil {
+ az.logger.WithError(err).Warn("Couldn't make client")
+ return err
+ }
+
+ blobsvc := client.GetBlobService()
+ az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
az.dispatcherID = dispatcherID
az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
- az.ctx, az.stopFunc = context.WithCancel(context.Background())
go func() {
az.stopWg.Add(1)
defer az.stopWg.Done()
instanceType arvados.InstanceType,
imageID cloud.ImageID,
newTags cloud.InstanceTags,
+ initCommand cloud.InitCommand,
publicKey ssh.PublicKey) (cloud.Instance, error) {
az.stopWg.Add(1)
defer az.stopWg.Done()
- if len(newTags["node-token"]) == 0 {
- return nil, fmt.Errorf("Must provide tag 'node-token'")
- }
-
name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
if err != nil {
return nil, err
tags["dispatch-"+k] = &newstr
}
- tags["dispatch-instance-type"] = &instanceType.Name
-
nicParameters := network.Interface{
Location: &az.azconfig.Location,
Tags: tags,
return nil, wrapAzureError(err)
}
- instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
+ blobname := fmt.Sprintf("%s-os.vhd", name)
+ instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
az.azconfig.StorageAccount,
az.azureEnv.StorageEndpointSuffix,
az.azconfig.BlobContainer,
- name)
+ blobname)
- customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
-echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
+ customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
vmParameters := compute.VirtualMachine{
Location: &az.azconfig.Location,
},
OsProfile: &compute.OSProfile{
ComputerName: &name,
- AdminUsername: to.StringPtr("crunch"),
+ AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
LinuxConfiguration: &compute.LinuxConfiguration{
DisablePasswordAuthentication: to.BoolPtr(true),
SSH: &compute.SSHConfiguration{
PublicKeys: &[]compute.SSHPublicKey{
- compute.SSHPublicKey{
- Path: to.StringPtr("/home/crunch/.ssh/authorized_keys"),
+ {
+ Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
},
},
vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
if err != nil {
+ _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
+ if delerr != nil {
+ az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
+ }
+
+ _, delerr = az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
+ if delerr != nil {
+ az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
+ }
+
return nil, wrapAzureError(err)
}
if result.Value().Tags["created-at"] != nil {
createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
if err == nil {
- if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
- az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
+ if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
+ az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
az.deleteNIC <- *result.Value().Name
}
}
// leased to a VM) and haven't been modified for
// DeleteDanglingResourcesAfter seconds.
func (az *azureInstanceSet) manageBlobs() {
- result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
- if err != nil {
- az.logger.WithError(err).Warn("Couldn't get account keys")
- return
- }
-
- key1 := *(*result.Keys)[0].Value
- client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
- if err != nil {
- az.logger.WithError(err).Warn("Couldn't make client")
- return
- }
-
- blobsvc := client.GetBlobService()
- blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
page := storage.ListBlobsParameters{Prefix: az.namePrefix}
timestamp := time.Now()
for {
- response, err := blobcont.ListBlobs(page)
+ response, err := az.blobcont.ListBlobs(page)
if err != nil {
az.logger.WithError(err).Warn("Error listing blobs")
return
}
func (ai *azureInstance) Address() string {
- return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
-}
-
-func (ai *azureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
- ai.provider.stopWg.Add(1)
- defer ai.provider.stopWg.Done()
-
- remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
-
- tags := ai.Tags()
-
- tg := tags["ssh-pubkey-fingerprint"]
- if tg != "" {
- if remoteFingerprint == tg {
- return nil
- }
- return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
- }
-
- nodetokenTag := tags["node-token"]
- if nodetokenTag == "" {
- return fmt.Errorf("Missing node token tag")
- }
+ if ai.nic.IPConfigurations != nil &&
+ len(*ai.nic.IPConfigurations) > 0 &&
+ (*ai.nic.IPConfigurations)[0].PrivateIPAddress != nil {
- sess, err := client.NewSession()
- if err != nil {
- return err
- }
-
- nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
- if err != nil {
- return err
+ return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
}
+ return ""
+}
- nodetoken := strings.TrimSpace(string(nodetokenbytes))
-
- expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
-
- if strings.TrimSpace(nodetoken) != expectedToken {
- return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
- }
-
- sess, err = client.NewSession()
- if err != nil {
- return err
- }
-
- keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
- if err != nil {
- return err
- }
-
- sp := strings.Split(string(keyfingerprintbytes), " ")
-
- if remoteFingerprint != sp[1] {
- return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
- }
+func (ai *azureInstance) RemoteUser() string {
+ return ai.provider.azconfig.AdminUsername
+}
- tags["ssh-pubkey-fingerprint"] = sp[1]
- delete(tags, "node-token")
- ai.SetTags(tags)
- return nil
+func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+ return cloud.ErrNotImplemented
}
// StorageAccount: example
// BlobContainer: vhds
// DeleteDanglingResourcesAfter: 20s
+// AdminUsername: crunch
package azure
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/to"
- "github.com/jmcvetta/randutil"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
return network.InterfaceListResultIterator{}, nil
}
+type BlobContainerStub struct{}
+
+func (*BlobContainerStub) GetBlobReference(name string) *storage.Blob {
+ return nil
+}
+
+func (*BlobContainerStub) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+ return storage.BlobListResponse{}, nil
+}
+
type testConfig struct {
ImageIDForTestSuite string
DriverParameters json.RawMessage
ap.ctx, ap.stopFunc = context.WithCancel(context.Background())
ap.vmClient = &VirtualMachinesClientStub{}
ap.netClient = &InterfacesClientStub{}
+ ap.blobcont = &BlobContainerStub{}
return &ap, cloud.ImageID("blob"), cluster, nil
}
pk, _, _, _, err := ssh.ParseAuthorizedKey(testKey)
c.Assert(err, check.IsNil)
- nodetoken, err := randutil.String(40, "abcdefghijklmnopqrstuvwxyz0123456789")
- c.Assert(err, check.IsNil)
-
inst, err := ap.Create(cluster.InstanceTypes["tiny"],
img, map[string]string{
- "node-token": nodetoken},
- pk)
+ "TestTagName": "test tag value",
+ }, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
c.Assert(err, check.IsNil)
- tg := inst.Tags()
- log.Printf("Result %v %v %v", inst.String(), inst.Address(), tg)
+ tags := inst.Tags()
+ c.Check(tags["TestTagName"], check.Equals, "test tag value")
+ c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
}
c.Assert(err, check.IsNil)
if len(l) > 0 {
-
sshclient, err := SetupSSHClient(c, l[0])
c.Assert(err, check.IsNil)
+ defer sshclient.Conn.Close()
sess, err := sshclient.NewSession()
c.Assert(err, check.IsNil)
-
- out, err := sess.Output("cat /home/crunch/node-token")
+ defer sess.Close()
+ _, err = sess.Output("find /var/run/test-file -maxdepth 0 -user root -perm 0600")
c.Assert(err, check.IsNil)
- log.Printf("%v", string(out))
-
- sshclient.Conn.Close()
+ sess, err = sshclient.NewSession()
+ c.Assert(err, check.IsNil)
+ defer sess.Close()
+ out, err := sess.Output("sudo cat /var/run/test-file")
+ c.Assert(err, check.IsNil)
+ c.Check(string(out), check.Equals, "test-file-data")
}
}
import (
"encoding/json"
+ "errors"
"io"
"time"
Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
}
+var ErrNotImplemented = errors.New("not implemented")
+
// An ExecutorTarget is a remote command execution service.
type ExecutorTarget interface {
// SSH server hostname or IP address, or empty string if
// unknown while instance is booting.
Address() string
+ // Remote username to send during SSH authentication.
+ RemoteUser() string
+
// Return nil if the given public key matches the instance's
// SSH server key. If the provided Dialer is not nil,
// VerifyHostKey can use it to make outgoing network
// connections from the instance -- e.g., to use the cloud's
// "this instance's metadata" API.
+ //
+ // Return ErrNotImplemented if no verification mechanism is
+ // available.
VerifyHostKey(ssh.PublicKey, *ssh.Client) error
}
// All public methods of an InstanceSet, and all public methods of the
// instances it returns, are goroutine safe.
type InstanceSet interface {
- // Create a new instance. If supported by the driver, add the
+ // Create a new instance with the given type, image, and
+ // initial set of tags. If supported by the driver, add the
// provided public key to /root/.ssh/authorized_keys.
//
+ // The given InitCommand should be executed on the newly
+ // created instance. This is optional for a driver whose
+ // instances' VerifyHostKey() method never returns
+ // ErrNotImplemented. InitCommand will be under 1 KiB.
+ //
// The returned error should implement RateLimitError and
// QuotaError where applicable.
- Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+ Create(arvados.InstanceType, ImageID, InstanceTags, InitCommand, ssh.PublicKey) (Instance, error)
// Return all instances, including ones that are booting or
// shutting down. Optionally, filter out nodes that don't have
Stop()
}
+type InitCommand string
+
// A Driver returns an InstanceSet that uses the given InstanceSetID
// and driver-dependent configuration parameters.
//
package controller
import (
+ "context"
+
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/service"
"git.curoverse.com/arvados.git/sdk/go/arvados"
var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
-func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
return &Handler{Cluster: cluster, NodeProfile: np}
}
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/sirupsen/logrus"
var _ = check.Suite(&FederationSuite{})
type FederationSuite struct {
- log *logrus.Logger
+ log logrus.FieldLogger
// testServer and testHandler are the controller being tested,
// "zhome".
testServer *httpserver.Server
}
func (s *FederationSuite) SetUpTest(c *check.C) {
- s.log = logrus.New()
- s.log.Formatter = &logrus.JSONFormatter{}
- s.log.Out = &logWriter{c.Log}
+ s.log = ctxlog.TestLogger(c)
s.remoteServer = newServerFromIntegrationTestEnv(c)
c.Assert(s.remoteServer.Start(), check.IsNil)
package controller
import (
+ "context"
"encoding/json"
"net/http"
"net/http/httptest"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
check "gopkg.in/check.v1"
)
type HandlerSuite struct {
cluster *arvados.Cluster
handler http.Handler
+ ctx context.Context
+ cancel context.CancelFunc
}
func (s *HandlerSuite) SetUpTest(c *check.C) {
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+ s.ctx = ctxlog.Context(s.ctx, ctxlog.New(os.Stderr, "json", "debug"))
s.cluster = &arvados.Cluster{
ClusterID: "zzzzz",
PostgreSQL: integrationTestCluster().PostgreSQL,
},
}
node := s.cluster.NodeProfiles["*"]
- s.handler = newHandler(s.cluster, &node)
+ s.handler = newHandler(s.ctx, s.cluster, &node)
+}
+
+func (s *HandlerSuite) TearDownTest(c *check.C) {
+ s.cancel()
}
func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) {
package controller
import (
- "bytes"
"net/http"
"os"
"path/filepath"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
- "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
-// logWriter is an io.Writer that writes by calling a "write log"
-// function, typically (*check.C)Log().
-type logWriter struct {
- logfunc func(...interface{})
-}
-
-func (tl *logWriter) Write(buf []byte) (int, error) {
- tl.logfunc(string(bytes.TrimRight(buf, "\n")))
- return len(buf), nil
-}
-
func integrationTestCluster() *arvados.Cluster {
cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
if err != nil {
// Return a new unstarted controller server, using the Rails API
// provided by the integration-testing environment.
func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
- log := logrus.New()
- log.Formatter = &logrus.JSONFormatter{}
- log.Out = &logWriter{c.Log}
+ log := ctxlog.TestLogger(c)
nodeProfile := arvados.NodeProfile{
Controller: arvados.SystemServiceInstance{Listen: ":"},
package dispatchcloud
import (
+ "context"
+
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/service"
"git.curoverse.com/arvados.git/sdk/go/arvados"
var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
-func newHandler(cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
- d := &dispatcher{Cluster: cluster}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
+ d := &dispatcher{Cluster: cluster, Context: ctx}
go d.Start()
return d
}
defer cq.mtx.Unlock()
ctr := cq.current[uuid].Container
if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
- delete(cq.current, uuid)
+ cq.delEnt(uuid, ctr.State)
}
}
cq.current[uuid] = cur
}
}
- for uuid := range cq.current {
+ for uuid, ent := range cq.current {
if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
// Don't expunge an entry that was
// added/updated locally after we started
// the poll response (evidently it's
// cancelled, completed, deleted, or taken by
// a different dispatcher).
- delete(cq.current, uuid)
+ cq.delEnt(uuid, ent.Container.State)
}
}
cq.dontupdate = nil
return nil
}
+// Caller must have lock.
+func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
+ cq.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": state,
+ }).Info("dropping container from queue")
+ delete(cq.current, uuid)
+}
+
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
it, err := cq.chooseType(&ctr)
if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
}()
return
}
+ cq.logger.WithFields(logrus.Fields{
+ "ContainerUUID": ctr.UUID,
+ "State": ctr.State,
+ "Priority": ctr.Priority,
+ "InstanceType": it.Name,
+ }).Info("adding container to queue")
cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
}
package dispatchcloud
import (
+ "context"
"crypto/md5"
"encoding/json"
"fmt"
"git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
type dispatcher struct {
Cluster *arvados.Cluster
+ Context context.Context
InstanceSetID cloud.InstanceSetID
logger logrus.FieldLogger
}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
- disp.logger = logrus.StandardLogger()
+ disp.logger = ctxlog.FromContext(disp.Context)
if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
}
disp.instanceSet = instanceSet
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+ disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
if disp.Cluster.ManagementToken == "" {
mux := httprouter.New()
mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/hold", disp.apiInstanceHold)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/drain", disp.apiInstanceDrain)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/run", disp.apiInstanceRun)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
if pollInterval <= 0 {
pollInterval = defaultPollInterval
}
- sched := scheduler.New(disp.logger, disp.queue, disp.pool, staleLockTimeout, pollInterval)
+ sched := scheduler.New(disp.Context, disp.queue, disp.pool, staleLockTimeout, pollInterval)
sched.Start()
defer sched.Stop()
}
func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
- params, _ := r.Context().Value(httprouter.ParamsKey).(httprouter.Params)
- id := cloud.InstanceID(params.ByName("instance_id"))
+ id := cloud.InstanceID(r.FormValue("instance_id"))
+ if id == "" {
+ httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+ return
+ }
err := disp.pool.SetIdleBehavior(id, want)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusNotFound)
package dispatchcloud
import (
+ "context"
"encoding/json"
"io/ioutil"
"math/rand"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&DispatcherSuite{})
type DispatcherSuite struct {
+ ctx context.Context
+ cancel context.CancelFunc
cluster *arvados.Cluster
stubDriver *test.StubDriver
disp *dispatcher
}
func (s *DispatcherSuite) SetUpTest(c *check.C) {
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+ s.ctx = ctxlog.Context(s.ctx, ctxlog.TestLogger(c))
dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
c.Assert(err, check.IsNil)
},
},
}
- s.disp = &dispatcher{Cluster: s.cluster}
+ s.disp = &dispatcher{
+ Cluster: s.cluster,
+ Context: s.ctx,
+ }
// Test cases can modify s.cluster before calling
// initialize(), and then modify private state before calling
// go run().
}
func (s *DispatcherSuite) TearDownTest(c *check.C) {
+ s.cancel()
s.disp.Close()
}
// the size of the manifest.
//
// Use the following heuristic:
- // - Start with the length of the mainfest (n)
+ // - Start with the length of the manifest (n)
// - Subtract 80 characters for the filename and file segment
// - Divide by 42 to get the number of block identifiers ('hash\+size\ ' is 32+1+8+1)
// - Assume each block is full, multiply by 64 MiB
var stale []string
timeout := time.NewTimer(sch.staleLockTimeout)
waiting:
- for {
+ for sch.pool.CountWorkers()[worker.StateUnknown] > 0 {
running := sch.pool.Running()
qEntries, _ := sch.queue.Entries()
select {
case <-wp:
- // Stop waiting if all workers have been
- // contacted.
- if sch.pool.CountWorkers()[worker.StateUnknown] == 0 {
- break waiting
- }
case <-timeout.C:
// Give up.
break waiting
import (
"sort"
+ "time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
"git.curoverse.com/arvados.git/sdk/go/arvados"
overquota = sorted[i:]
break tryrun
}
- sch.bgLock(logger, ctr.UUID)
+ go sch.lockContainer(logger, ctr.UUID)
unalloc[it]--
case arvados.ContainerStateLocked:
if unalloc[it] > 0 {
}
}
-// Start an API call to lock the given container, and return
-// immediately while waiting for the response in a new goroutine. Do
-// nothing if a lock request is already in progress for this
-// container.
-func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
- logger.Debug("locking")
- sch.mtx.Lock()
- defer sch.mtx.Unlock()
- if sch.locking[uuid] {
- logger.Debug("locking in progress, doing nothing")
+// Lock the given container. Should be called in a new goroutine.
+func (sch *Scheduler) lockContainer(logger logrus.FieldLogger, uuid string) {
+ if !sch.uuidLock(uuid, "lock") {
return
}
+ defer sch.uuidUnlock(uuid)
if ctr, ok := sch.queue.Get(uuid); !ok || ctr.State != arvados.ContainerStateQueued {
// This happens if the container has been cancelled or
// locked since runQueue called sch.queue.Entries(),
- // possibly by a bgLock() call from a previous
+ // possibly by a lockContainer() call from a previous
// runQueue iteration. In any case, we will respond
// appropriately on the next runQueue iteration, which
// will have already been triggered by the queue
logger.WithField("State", ctr.State).Debug("container no longer queued by the time we decided to lock it, doing nothing")
return
}
- sch.locking[uuid] = true
- go func() {
- defer func() {
- sch.mtx.Lock()
- defer sch.mtx.Unlock()
- delete(sch.locking, uuid)
- }()
- err := sch.queue.Lock(uuid)
- if err != nil {
- logger.WithError(err).Warn("error locking container")
- return
- }
- logger.Debug("lock succeeded")
- ctr, ok := sch.queue.Get(uuid)
- if !ok {
- logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
- } else if ctr.State != arvados.ContainerStateLocked {
- logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
- }
- }()
+ err := sch.queue.Lock(uuid)
+ if err != nil {
+ logger.WithError(err).Warn("error locking container")
+ return
+ }
+ logger.Debug("lock succeeded")
+ ctr, ok := sch.queue.Get(uuid)
+ if !ok {
+ logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
+ } else if ctr.State != arvados.ContainerStateLocked {
+ logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
+ }
+}
+
+// Acquire a non-blocking lock for specified UUID, returning true if
+// successful. The op argument is used only for debug logs.
+//
+// If the lock is not available, uuidLock arranges to wake up the
+// scheduler after a short delay, so it can retry whatever operation
+// is trying to get the lock (if that operation is still worth doing).
+//
+// This mechanism helps avoid spamming the controller/database with
+// concurrent updates for any single container, even when the
+// scheduler loop is running frequently.
+func (sch *Scheduler) uuidLock(uuid, op string) bool {
+ sch.mtx.Lock()
+ defer sch.mtx.Unlock()
+ logger := sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "Op": op,
+ })
+ if op, locked := sch.uuidOp[uuid]; locked {
+ logger.Debugf("uuidLock not available, Op=%s in progress", op)
+ // Make sure the scheduler loop wakes up to retry.
+ sch.wakeup.Reset(time.Second / 4)
+ return false
+ }
+ logger.Debug("uuidLock acquired")
+ sch.uuidOp[uuid] = op
+ return true
+}
+
+func (sch *Scheduler) uuidUnlock(uuid string) {
+ sch.mtx.Lock()
+ defer sch.mtx.Unlock()
+ delete(sch.uuidOp, uuid)
}
package scheduler
import (
+ "context"
"sync"
"time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
)
// immediately. Don't try to create any other nodes after the failed
// create.
func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
queue := test.Queue{
ChooseType: chooseType,
Containers: []arvados.Container{
running: map[string]time.Time{},
canCreate: 0,
}
- New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.running, check.HasLen, 1)
// If Create() fails, shutdown some nodes, and don't call Create()
// again. Don't call Create() at all if AtQuota() is true.
func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
for quota := 0; quota < 2; quota++ {
c.Logf("quota=%d", quota)
shouldCreate := []arvados.InstanceType{}
starts: []string{},
canCreate: 0,
}
- New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, shouldCreate)
c.Check(pool.starts, check.DeepEquals, []string{})
c.Check(pool.shutdowns, check.Not(check.Equals), 0)
// Start lower-priority containers while waiting for new/existing
// workers to come up for higher-priority containers.
func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
pool := stubPool{
unalloc: map[arvados.InstanceType]int{
test.InstanceType(1): 2,
},
}
queue.Update()
- New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
running := map[string]bool{}
package scheduler
import (
+ "context"
"sync"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/sirupsen/logrus"
)
staleLockTimeout time.Duration
queueUpdateInterval time.Duration
- locking map[string]bool
- mtx sync.Mutex
+ uuidOp map[string]string // operation in progress: "lock", "cancel", ...
+ mtx sync.Mutex
+ wakeup *time.Timer
runOnce sync.Once
stop chan struct{}
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
return &Scheduler{
- logger: logger,
+ logger: ctxlog.FromContext(ctx),
queue: queue,
pool: pool,
staleLockTimeout: staleLockTimeout,
queueUpdateInterval: queueUpdateInterval,
+ wakeup: time.NewTimer(time.Second),
stop: make(chan struct{}),
stopped: make(chan struct{}),
- locking: map[string]bool{},
+ uuidOp: map[string]string{},
}
}
// Ensure the queue is fetched once before attempting anything.
for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
sch.logger.Errorf("error updating queue: %s", err)
- d := sch.queueUpdateInterval / 60
+ d := sch.queueUpdateInterval / 10
+ if d < time.Second {
+ d = time.Second
+ }
sch.logger.Infof("waiting %s before retry", d)
time.Sleep(d)
}
return
case <-queueNotify:
case <-poolNotify:
+ case <-sch.wakeup.C:
}
}
}
import (
"fmt"
- "time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
"git.curoverse.com/arvados.git/sdk/go/arvados"
// cancelled.
func (sch *Scheduler) sync() {
running := sch.pool.Running()
- cancel := func(ent container.QueueEnt, reason string) {
- uuid := ent.Container.UUID
- logger := sch.logger.WithField("ContainerUUID", uuid)
- logger.Infof("cancelling container because %s", reason)
- err := sch.queue.Cancel(uuid)
- if err != nil {
- logger.WithError(err).Print("error cancelling container")
- }
- }
- kill := func(ent container.QueueEnt, reason string) {
- uuid := ent.Container.UUID
- logger := sch.logger.WithField("ContainerUUID", uuid)
- logger.Debugf("killing crunch-run process because %s", reason)
- sch.pool.KillContainer(uuid)
- }
qEntries, qUpdated := sch.queue.Entries()
for uuid, ent := range qEntries {
exited, running := running[uuid]
switch ent.Container.State {
case arvados.ContainerStateRunning:
if !running {
- go cancel(ent, "not running on any worker")
+ go sch.cancel(ent, "not running on any worker")
} else if !exited.IsZero() && qUpdated.After(exited) {
- go cancel(ent, "state=\"Running\" after crunch-run exited")
+ go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
} else if ent.Container.Priority == 0 {
- go kill(ent, fmt.Sprintf("priority=%d", ent.Container.Priority))
+ go sch.kill(ent, "priority=0")
}
case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
if running {
// of kill() will be to make the
// worker available for the next
// container.
- go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
} else {
sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
// a network outage and is still
// preparing to run a container that
// has already been unlocked/requeued.
- go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
}
case arvados.ContainerStateLocked:
if running && !exited.IsZero() && qUpdated.After(exited) {
- logger := sch.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "Exited": time.Since(exited).Seconds(),
- })
- logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
- err := sch.queue.Unlock(uuid)
- if err != nil {
- logger.WithError(err).Info("error requeueing container")
- }
+ go sch.requeue(ent, "crunch-run exited")
+ } else if running && exited.IsZero() && ent.Container.Priority == 0 {
+ go sch.kill(ent, "priority=0")
+ } else if !running && ent.Container.Priority == 0 {
+ go sch.requeue(ent, "priority=0")
}
default:
- sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+ sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": ent.Container.State,
+ }).Error("BUG: unexpected state")
}
}
}
+
+func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
+ uuid := ent.Container.UUID
+ if !sch.uuidLock(uuid, "cancel") {
+ return
+ }
+ defer sch.uuidUnlock(uuid)
+ logger := sch.logger.WithField("ContainerUUID", uuid)
+ logger.Infof("cancelling container because %s", reason)
+ err := sch.queue.Cancel(uuid)
+ if err != nil {
+ logger.WithError(err).Print("error cancelling container")
+ }
+}
+
+func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
+ uuid := ent.Container.UUID
+ logger := sch.logger.WithField("ContainerUUID", uuid)
+ logger.Debugf("killing crunch-run process because %s", reason)
+ sch.pool.KillContainer(uuid)
+}
+
+func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
+ uuid := ent.Container.UUID
+ if !sch.uuidLock(uuid, "cancel") {
+ return
+ }
+ defer sch.uuidUnlock(uuid)
+ logger := sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": ent.Container.State,
+ "Priority": ent.Container.Priority,
+ })
+ logger.Infof("requeueing locked container because %s", reason)
+ err := sch.queue.Unlock(uuid)
+ if err != nil {
+ logger.WithError(err).Error("error requeueing container")
+ }
+}
type Executor struct {
target cloud.ExecutorTarget
targetPort string
+ targetUser string
signers []ssh.Signer
mtx sync.RWMutex // controls access to instance after creation
if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
// Target address does not specify a port. Use
// targetPort, or "ssh".
+ if h == "" {
+ h = addr
+ }
if p = exr.targetPort; p == "" {
p = "ssh"
}
}
var receivedKey ssh.PublicKey
client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
- User: "root",
+ User: target.RemoteUser(),
Auth: []ssh.AuthMethod{
ssh.PublicKeys(exr.signers...),
},
return 0
},
HostKey: hostpriv,
+ AuthorizedUser: "username",
AuthorizedKeys: []ssh.PublicKey{clientpub},
},
}
return uint32(exitcode)
},
HostKey: hostpriv,
+ AuthorizedUser: "username",
AuthorizedKeys: []ssh.PublicKey{clientpub},
},
}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package test
-
-import (
- "os"
-
- "github.com/sirupsen/logrus"
-)
-
-func Logger() logrus.FieldLogger {
- logger := logrus.StandardLogger()
- if os.Getenv("ARVADOS_DEBUG") != "" {
- logger.SetLevel(logrus.DebugLevel)
- }
- return logger
-}
type SSHService struct {
Exec SSHExecFunc
HostKey ssh.Signer
+ AuthorizedUser string
AuthorizedKeys []ssh.PublicKey
listener net.Listener
return ln.Addr().String()
}
+// RemoteUser returns the username that will be accepted.
+func (ss *SSHService) RemoteUser() string {
+ return ss.AuthorizedUser
+}
+
// Close shuts down the server and releases resources. Established
// connections are unaffected.
func (ss *SSHService) Close() {
}
config.AddHostKey(ss.HostKey)
- listener, err := net.Listen("tcp", ":")
+ listener, err := net.Listen("tcp", "127.0.0.1:")
if err != nil {
ss.err = err
return
"errors"
"fmt"
"io"
+ "io/ioutil"
math_rand "math/rand"
"regexp"
"strings"
}
sis := StubInstanceSet{
driver: sd,
+ logger: logger,
servers: map[cloud.InstanceID]*StubVM{},
}
sd.instanceSets = append(sd.instanceSets, &sis)
type StubInstanceSet struct {
driver *StubDriver
+ logger logrus.FieldLogger
servers map[cloud.InstanceID]*StubVM
mtx sync.RWMutex
stopped bool
allowInstancesCall time.Time
}
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
if sis.driver.HoldCloudOps {
sis.driver.holdCloudOps <- true
}
id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
tags: copyTags(tags),
providerType: it.ProviderType,
+ initCommand: cmd,
}
svm.SSHService = SSHService{
HostKey: sis.driver.HostKey,
+ AuthorizedUser: "root",
AuthorizedKeys: ak,
Exec: svm.Exec,
}
sis *StubInstanceSet
id cloud.InstanceID
tags cloud.InstanceTags
+ initCommand cloud.InitCommand
providerType string
SSHService SSHService
running map[string]bool
}
func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ stdinData, err := ioutil.ReadAll(stdin)
+ if err != nil {
+ fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
+ return 1
+ }
queue := svm.sis.driver.Queue
uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
if eta := svm.Boot.Sub(time.Now()); eta > 0 {
fmt.Fprint(stderr, "crunch-run: command not found\n")
return 1
}
- if strings.HasPrefix(command, "crunch-run --detach ") {
+ if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
+ var stdinKV map[string]string
+ err := json.Unmarshal(stdinData, &stdinKV)
+ if err != nil {
+ fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
+ return 1
+ }
for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
- if env[name] == "" {
- fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
+ if stdinKV[name] == "" {
+ fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
return 1
}
}
svm.Unlock()
time.Sleep(svm.CrunchRunDetachDelay)
fmt.Fprintf(stderr, "starting %s\n", uuid)
- logger := logrus.WithFields(logrus.Fields{
+ logger := svm.sis.logger.WithFields(logrus.Fields{
"Instance": svm.id,
"ContainerUUID": uuid,
})
return si.addr
}
+func (si stubInstance) RemoteUser() string {
+ return si.svm.SSHService.AuthorizedUser
+}
+
func (si stubInstance) Destroy() error {
sis := si.svm.sis
if sis.driver.HoldCloudOps {
package worker
import (
+ "crypto/rand"
"errors"
+ "fmt"
"io"
"sort"
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
const (
- tagKeyInstanceType = "InstanceType"
- tagKeyIdleBehavior = "IdleBehavior"
+ tagKeyInstanceType = "InstanceType"
+ tagKeyIdleBehavior = "IdleBehavior"
+ tagKeyInstanceSecret = "InstanceSecret"
)
// An InstanceView shows a worker's current state and recent activity.
type InstanceView struct {
Instance cloud.InstanceID `json:"instance"`
+ Address string `json:"address"`
Price float64 `json:"price"`
ArvadosInstanceType string `json:"arvados_instance_type"`
ProviderInstanceType string `json:"provider_instance_type"`
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
arvClient: arvClient,
timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ installPublicKey: installPublicKey,
stop: make(chan bool),
}
wp.registerMetrics(reg)
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutShutdown time.Duration
+ installPublicKey ssh.PublicKey
// private state
subscribers map[<-chan struct{}]chan<- struct{}
throttleCreate throttle
throttleInstances throttle
- mInstances prometheus.Gauge
- mInstancesPrice prometheus.Gauge
mContainersRunning prometheus.Gauge
- mVCPUs prometheus.Gauge
- mVCPUsInuse prometheus.Gauge
- mMemory prometheus.Gauge
- mMemoryInuse prometheus.Gauge
+ mInstances *prometheus.GaugeVec
+ mInstancesPrice *prometheus.GaugeVec
+ mVCPUs *prometheus.GaugeVec
+ mMemory *prometheus.GaugeVec
}
// Subscribe returns a buffered channel that becomes ready after any
if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
return false
}
- tags := cloud.InstanceTags{
- tagKeyInstanceType: it.Name,
- tagKeyIdleBehavior: string(IdleBehaviorRun),
- }
now := time.Now()
wp.creating[it] = append(wp.creating[it], now)
go func() {
defer wp.notify()
- inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+ secret := randomHex(instanceSecretLength)
+ tags := cloud.InstanceTags{
+ tagKeyInstanceType: it.Name,
+ tagKeyIdleBehavior: string(IdleBehaviorRun),
+ tagKeyInstanceSecret: secret,
+ }
+ initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+ inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
// Remove our timestamp marker from wp.creating
//
// Caller must have lock.
func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+ inst = tagVerifier{inst}
id := inst.ID()
if wkr := wp.workers[id]; wkr != nil {
wkr.executor.SetTarget(inst)
logger := wp.logger.WithFields(logrus.Fields{
"InstanceType": it.Name,
- "Instance": inst,
+ "Instance": inst.ID(),
+ "Address": inst.Address(),
})
logger.WithFields(logrus.Fields{
"State": initialState,
func (wp *Pool) kill(wkr *worker, uuid string) {
logger := wp.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
- "Instance": wkr.instance,
+ "Instance": wkr.instance.ID(),
})
logger.Debug("killing process")
- stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
+ cmd := "crunch-run --kill 15 " + uuid
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
logger.WithFields(logrus.Fields{
"stderr": string(stderr),
if reg == nil {
reg = prometheus.NewRegistry()
}
- wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "instances_total",
- Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
- })
- reg.MustRegister(wp.mInstances)
- wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "instances_price_total",
- Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
- })
- reg.MustRegister(wp.mInstancesPrice)
wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Help: "Number of containers reported running by cloud VMs.",
})
reg.MustRegister(wp.mContainersRunning)
-
- wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_total",
+ Help: "Number of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstances)
+ wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_price",
+ Help: "Price of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstancesPrice)
+ wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "vcpus_total",
Help: "Total VCPUs on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mVCPUs)
- wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "vcpus_inuse",
- Help: "VCPUs on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mVCPUsInuse)
- wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "memory_bytes_total",
Help: "Total memory on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mMemory)
- wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "memory_bytes_inuse",
- Help: "Memory on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mMemoryInuse)
}
func (wp *Pool) runMetrics() {
ch := wp.Subscribe()
defer wp.Unsubscribe(ch)
+ wp.updateMetrics()
for range ch {
wp.updateMetrics()
}
wp.mtx.RLock()
defer wp.mtx.RUnlock()
- var price float64
- var alloc, cpu, cpuInuse, mem, memInuse int64
+ instances := map[string]int64{}
+ price := map[string]float64{}
+ cpu := map[string]int64{}
+ mem := map[string]int64{}
+ var running int64
for _, wkr := range wp.workers {
- price += wkr.instType.Price
- cpu += int64(wkr.instType.VCPUs)
- mem += int64(wkr.instType.RAM)
- if len(wkr.running)+len(wkr.starting) == 0 {
- continue
+ var cat string
+ switch {
+ case len(wkr.running)+len(wkr.starting) > 0:
+ cat = "inuse"
+ case wkr.idleBehavior == IdleBehaviorHold:
+ cat = "hold"
+ case wkr.state == StateBooting:
+ cat = "booting"
+ case wkr.state == StateUnknown:
+ cat = "unknown"
+ default:
+ cat = "idle"
}
- alloc += int64(len(wkr.running) + len(wkr.starting))
- cpuInuse += int64(wkr.instType.VCPUs)
- memInuse += int64(wkr.instType.RAM)
- }
- wp.mInstances.Set(float64(len(wp.workers)))
- wp.mInstancesPrice.Set(price)
- wp.mContainersRunning.Set(float64(alloc))
- wp.mVCPUs.Set(float64(cpu))
- wp.mMemory.Set(float64(mem))
- wp.mVCPUsInuse.Set(float64(cpuInuse))
- wp.mMemoryInuse.Set(float64(memInuse))
+ instances[cat]++
+ price[cat] += wkr.instType.Price
+ cpu[cat] += int64(wkr.instType.VCPUs)
+ mem[cat] += int64(wkr.instType.RAM)
+ running += int64(len(wkr.running) + len(wkr.starting))
+ }
+ for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+ wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
+ wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+ wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+ wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+ }
+ wp.mContainersRunning.Set(float64(running))
}
func (wp *Pool) runProbes() {
for _, w := range wp.workers {
r = append(r, InstanceView{
Instance: w.instance.ID(),
+ Address: w.instance.Address(),
Price: w.instType.Price,
ArvadosInstanceType: w.instType.Name,
ProviderInstanceType: w.instType.ProviderType,
continue
}
logger := wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
+ "Instance": wkr.instance.ID(),
"WorkerState": wkr.state,
})
logger.Info("instance disappeared in cloud")
go wp.notify()
}
}
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+ buf := make([]byte, n/2)
+ _, err := rand.Read(buf)
+ if err != nil {
+ panic(err)
+ }
+ return fmt.Sprintf("%x", buf)
+}
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
}
}
- logger := test.Logger()
+ logger := ctxlog.TestLogger(c)
driver := &test.StubDriver{}
is, err := driver.InstanceSet(nil, "", logger)
c.Assert(err, check.IsNil)
},
}
- pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+ pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
notify := pool.Subscribe()
defer pool.Unsubscribe(notify)
pool.Create(type1)
c.Log("------- starting new pool, waiting to recover state")
- pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+ pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
notify2 := pool2.Subscribe()
defer pool2.Unsubscribe(notify2)
waitForIdle(pool2, notify2)
}
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
- logger := test.Logger()
+ logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
instanceSet, err := driver.InstanceSet(nil, "", logger)
c.Assert(err, check.IsNil)
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "golang.org/x/crypto/ssh"
+)
+
+var (
+ errBadInstanceSecret = errors.New("bad instance secret")
+
+ // filename on instance, as given to shell (quoted accordingly)
+ instanceSecretFilename = "/var/run/arvados-instance-secret"
+ instanceSecretLength = 40 // hex digits
+)
+
+type tagVerifier struct {
+ cloud.Instance
+}
+
+func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
+ expectSecret := tv.Instance.Tags()[tagKeyInstanceSecret]
+ if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || expectSecret == "" {
+ // If the wrapped instance indicates it has a way to
+ // verify the key, return that decision.
+ return err
+ }
+ session, err := client.NewSession()
+ if err != nil {
+ return err
+ }
+ defer session.Close()
+ var stdout, stderr bytes.Buffer
+ session.Stdin = bytes.NewBuffer(nil)
+ session.Stdout = &stdout
+ session.Stderr = &stderr
+ cmd := fmt.Sprintf("cat %s", instanceSecretFilename)
+ if u := tv.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ err = session.Run(cmd)
+ if err != nil {
+ return err
+ }
+ if stdout.String() != expectSecret {
+ return errBadInstanceSecret
+ }
+ return nil
+}
import (
"bytes"
+ "encoding/json"
"fmt"
"strings"
"sync"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
"github.com/sirupsen/logrus"
)
"ContainerUUID": ctr.UUID,
"Priority": ctr.Priority,
})
- logger = logger.WithField("Instance", wkr.instance)
+ logger = logger.WithField("Instance", wkr.instance.ID())
logger.Debug("starting container")
wkr.starting[ctr.UUID] = struct{}{}
wkr.state = StateRunning
"ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
"ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
}
- stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
+ if wkr.wp.arvClient.Insecure {
+ env["ARVADOS_API_HOST_INSECURE"] = "1"
+ }
+ envJSON, err := json.Marshal(env)
+ if err != nil {
+ panic(err)
+ }
+ stdin := bytes.NewBuffer(envJSON)
+ cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
func (wkr *worker) probeRunning() (running []string, ok bool) {
cmd := "crunch-run --list"
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
wkr.logger.WithFields(logrus.Fields{
"State": wkr.state,
- "Age": age,
+ "IdleDuration": stats.Duration(age),
"IdleBehavior": wkr.idleBehavior,
}).Info("shutdown idle worker")
wkr.shutdown()
// match. Caller must have lock.
func (wkr *worker) saveTags() {
instance := wkr.instance
- have := instance.Tags()
- want := cloud.InstanceTags{
+ tags := instance.Tags()
+ update := cloud.InstanceTags{
tagKeyInstanceType: wkr.instType.Name,
tagKeyIdleBehavior: string(wkr.idleBehavior),
}
- go func() {
- for k, v := range want {
- if v == have[k] {
- continue
- }
- err := instance.SetTags(want)
+ save := false
+ for k, v := range update {
+ if tags[k] != v {
+ tags[k] = v
+ save = true
+ }
+ }
+ if save {
+ go func() {
+ err := instance.SetTags(tags)
if err != nil {
- wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+ wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
}
- break
-
- }
- }()
+ }()
+ }
}
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
)
type WorkerSuite struct{}
func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
- logger := test.Logger()
+ logger := ctxlog.TestLogger(c)
bootTimeout := time.Minute
probeTimeout := time.Second
is, err := (&test.StubDriver{}).InstanceSet(nil, "", logger)
c.Assert(err, check.IsNil)
- inst, err := is.Create(arvados.InstanceType{}, "", nil, nil)
+ inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
c.Assert(err, check.IsNil)
type trialT struct {
package service
import (
+ "context"
"flag"
"fmt"
"io"
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"github.com/coreos/go-systemd/daemon"
"github.com/sirupsen/logrus"
CheckHealth() error
}
-type NewHandlerFunc func(*arvados.Cluster, *arvados.NodeProfile) Handler
+type NewHandlerFunc func(context.Context, *arvados.Cluster, *arvados.NodeProfile) Handler
type command struct {
newHandler NewHandlerFunc
}
func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
- log := logrus.New()
- log.Formatter = &logrus.JSONFormatter{
- TimestampFormat: rfc3339NanoFixed,
- }
- log.Out = stderr
+ log := ctxlog.New(stderr, "json", "info")
var err error
defer func() {
if err != nil {
return 1
}
+ log = ctxlog.New(stderr, cluster.Logging.Format, cluster.Logging.Level).WithFields(logrus.Fields{
+ "PID": os.Getpid(),
+ })
+ ctx := ctxlog.Context(context.Background(), log)
profileName := *nodeProfile
if profileName == "" {
profileName = os.Getenv("ARVADOS_NODE_PROFILE")
err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
return 1
}
- handler := c.newHandler(cluster, profile)
+ handler := c.newHandler(ctx, cluster, profile)
if err = handler.CheckHealth(); err != nil {
return 1
}
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
- except Exception as e:
- logger.error(e)
+ except Exception:
+ logger.exception("Error creating the Arvados CWL Executor")
return 1
+ # Note that unless in debug mode, some stack traces related to user
+ # workflow errors may be suppressed. See ArvadosJob.done().
if arvargs.debug:
logger.setLevel(logging.DEBUG)
logging.getLogger('arvados').setLevel(logging.DEBUG)
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
else:
logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
- except Exception as e:
- logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
+ except Exception:
+ logger.exception("%s got an error", self.arvrunner.label(self))
self.output_callback({}, "permanentFail")
def done(self, record):
if record["output_uuid"]:
if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
# Compute the trash time to avoid requesting the collection record.
- trash_at = ciso8601.parse_datetime_unaware(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
+ trash_at = ciso8601.parse_datetime(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
if container["output"]:
outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
except WorkflowException as e:
+ # Only include a stack trace if in debug mode.
+ # A stack trace may obfuscate more useful output about the workflow.
logger.error("%s unable to collect output from %s:\n%s",
self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
- except Exception as e:
- logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
+ except Exception:
+ logger.exception("%s while getting output object:", self.arvrunner.label(self))
processStatus = "permanentFail"
finally:
self.output_callback(outputs, processStatus)
container = self.arvrunner.api.containers().get(
uuid=record["container_uuid"]
).execute(num_retries=self.arvrunner.num_retries)
- except Exception as e:
- logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
+ except Exception:
+ logger.exception("%s while getting runner container", self.arvrunner.label(self))
self.arvrunner.output_callback({}, "permanentFail")
else:
super(RunnerContainer, self).done(container)
arvados.commands.put.api_client = api_client
arvados.commands.keepdocker.main(args, stdout=sys.stderr, install_sig_handlers=False, api=api_client)
except SystemExit as e:
+ # If e.code is None or zero, then keepdocker exited normally and we can continue
if e.code:
raise WorkflowException("keepdocker exited with code %s" % e.code)
e)
else:
logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
- except Exception as e:
+ except Exception:
logger.exception("%s error" % (self.arvrunner.label(self)))
self.output_callback({}, "permanentFail")
body={
"components": components
}).execute(num_retries=self.arvrunner.num_retries)
- except Exception as e:
- logger.info("Error adding to components: %s", e)
+ except Exception:
+ logger.exception("Error adding to components")
def done(self, record):
try:
outputs = done.done(self, record, dirs["tmpdir"],
dirs["outdir"], dirs["keep"])
except WorkflowException as e:
+ # Only include a stack trace if in debug mode.
+ # This is most likely a user workflow error and a stack trace may obfuscate more useful output.
logger.error("%s unable to collect output from %s:\n%s",
self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
- except Exception as e:
+ except Exception:
logger.exception("Got unknown exception while collecting output for job %s:", self.name)
processStatus = "permanentFail"
def __init__(self, runtime_status_update_func):
super(RuntimeStatusLoggingHandler, self).__init__()
self.runtime_status_update = runtime_status_update_func
+ self.updatingRuntimeStatus = False
def emit(self, record):
kind = None
kind = 'error'
elif record.levelno >= logging.WARNING:
kind = 'warning'
- if kind is not None:
- log_msg = record.getMessage()
- if '\n' in log_msg:
- # If the logged message is multi-line, use its first line as status
- # and the rest as detail.
- status, detail = log_msg.split('\n', 1)
- self.runtime_status_update(
- kind,
- "%s: %s" % (record.name, status),
- detail
- )
- else:
- self.runtime_status_update(
- kind,
- "%s: %s" % (record.name, record.getMessage())
- )
+ if kind is not None and self.updatingRuntimeStatus is not True:
+ self.updatingRuntimeStatus = True
+ try:
+ log_msg = record.getMessage()
+ if '\n' in log_msg:
+ # If the logged message is multi-line, use its first line as status
+ # and the rest as detail.
+ status, detail = log_msg.split('\n', 1)
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, status),
+ detail
+ )
+ else:
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, record.getMessage())
+ )
+ finally:
+ self.updatingRuntimeStatus = False
+
class ArvCwlExecutor(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
keys = keys[pageSize:]
try:
proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.warning("Error checking states on API server: %s", e)
+ except Exception:
+ logger.exception("Error checking states on API server: %s")
remain_wait = self.poll_interval
continue
for i in self.intermediate_output_collections:
try:
self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
- except:
+ except Exception:
logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
- if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+ except (KeyboardInterrupt, SystemExit):
break
def check_features(self, obj):
body={
'is_trashed': True
}).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.info("Setting container output: %s", e)
+ except Exception:
+ logger.exception("Setting container output")
+ return
elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
body={
except:
if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
logger.error("Interrupted, workflow will be cancelled")
+ elif isinstance(sys.exc_info()[1], WorkflowException):
+ logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
else:
- logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ logger.exception("Workflow execution failed")
+
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
return True
except arvados.errors.NotFoundError:
return False
- except:
- logger.exception("Got unexpected exception checking if file exists:")
+ except Exception:
+ logger.exception("Got unexpected exception checking if file exists")
return False
return super(CollectionFetcher, self).check_exists(url)
from future.utils import viewvalues, viewitems
import os
+import sys
import urllib.parse
from functools import partial
import logging
import json
-import subprocess32 as subprocess
from collections import namedtuple
-
from io import StringIO
+if os.name == "posix" and sys.version_info[0] < 3:
+ import subprocess32 as subprocess
+else:
+ import subprocess
+
from schema_salad.sourceline import SourceLine, cmap
from cwltool.command_line_tool import CommandLineTool
fileobj["location"] = "keep:%s/%s" % (record["output"], path)
adjustFileObjs(outputs, keepify)
adjustDirObjs(outputs, keepify)
- except Exception as e:
- logger.exception("[%s] While getting final output object: %s", self.name, e)
+ except Exception:
+ logger.exception("[%s] While getting final output object", self.name)
self.arvrunner.output_callback({}, "permanentFail")
else:
self.arvrunner.output_callback(outputs, processStatus)
'ruamel.yaml >=0.15.54, <=0.15.77',
'arvados-python-client>=1.3.0.20190205182514',
'setuptools',
- 'ciso8601 >=1.0.6, <2.0.0',
- 'subprocess32>=3.5.1',
+ 'ciso8601 >= 2.0.0',
],
+ extras_require={
+ ':os.name=="posix" and python_version<"3"': ['subprocess32 >= 3.5.1'],
+ ':python_version<"3"': ['pytz'],
+ },
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
],
return loadingContext, runtimeContext
+ # Helper function to set up the ArvCwlExecutor to use the containers api
+ # and test that the RuntimeStatusLoggingHandler is set up correctly
+ def setup_and_test_container_executor_and_logging(self, gcc_mock) :
+ api = mock.MagicMock()
+ api._rootDesc = copy.deepcopy(get_rootDesc())
+ del api._rootDesc.get('resources')['jobs']['methods']['create']
+
+ # Make sure ArvCwlExecutor thinks it's running inside a container so it
+ # adds the logging handler that will call runtime_status_update() mock
+ self.assertFalse(gcc_mock.called)
+ runner = arvados_cwl.ArvCwlExecutor(api)
+ self.assertEqual(runner.work_api, 'containers')
+ root_logger = logging.getLogger('')
+ handlerClasses = [h.__class__ for h in root_logger.handlers]
+ self.assertTrue(arvados_cwl.RuntimeStatusLoggingHandler in handlerClasses)
+ return runner
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
+ # Test to make sure we dont call runtime_status_update if we already did
+ # some where higher up in the call stack
@mock.patch("arvados_cwl.util.get_current_container")
- @mock.patch("arvados.collection.CollectionReader")
- @mock.patch("arvados.collection.Collection")
- def test_child_failure(self, col, reader, gcc_mock):
- api = mock.MagicMock()
- api._rootDesc = copy.deepcopy(get_rootDesc())
- del api._rootDesc.get('resources')['jobs']['methods']['create']
+ def test_recursive_runtime_status_update(self, gcc_mock):
+ self.setup_and_test_container_executor_and_logging(gcc_mock)
+ root_logger = logging.getLogger('')
- # Set up runner with mocked runtime_status_update()
- self.assertFalse(gcc_mock.called)
- runtime_status_update = mock.MagicMock()
- arvados_cwl.ArvCwlExecutor.runtime_status_update = runtime_status_update
- runner = arvados_cwl.ArvCwlExecutor(api)
- self.assertEqual(runner.work_api, 'containers')
+ # get_current_container is invoked when we call runtime_status_update
+ # so try and log again!
+ gcc_mock.side_effect = lambda *args: root_logger.error("Second Error")
+ try:
+ root_logger.error("First Error")
+ except RuntimeError:
+ self.fail("RuntimeStatusLoggingHandler should not be called recursively")
- # Make sure ArvCwlExecutor thinks it's running inside a container so it
- # adds the logging handler that will call runtime_status_update() mock
+ @mock.patch("arvados_cwl.ArvCwlExecutor.runtime_status_update")
+ @mock.patch("arvados_cwl.util.get_current_container")
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("arvados.collection.Collection")
+ def test_child_failure(self, col, reader, gcc_mock, rts_mock):
+ runner = self.setup_and_test_container_executor_and_logging(gcc_mock)
+
gcc_mock.return_value = {"uuid" : "zzzzz-dz642-zzzzzzzzzzzzzzz"}
self.assertTrue(gcc_mock.called)
- root_logger = logging.getLogger('')
- handlerClasses = [h.__class__ for h in root_logger.handlers]
- self.assertTrue(arvados_cwl.RuntimeStatusLoggingHandler in handlerClasses)
runner.num_retries = 0
runner.ignore_docker_for_reuse = False
"modified_at": "2017-05-26T12:01:22Z"
})
- runtime_status_update.assert_called_with(
+ rts_mock.assert_called_with(
'error',
'arvados.cwl-runner: [container testjob] (zzzzz-xvhdp-zzzzzzzzzzzzzzz) error log:',
' ** log is empty **'
RemoteClusters map[string]RemoteCluster
PostgreSQL PostgreSQL
RequestLimits RequestLimits
+ Logging Logging
+}
+
+type Logging struct {
+ Level string
+ Format string
}
type PostgreSQL struct {
if _, ok := (*it)[t.Name]; ok {
return errDuplicateInstanceTypeName
}
+ if t.ProviderType == "" {
+ t.ProviderType = t.Name
+ }
(*it)[t.Name] = t
}
return nil
if err != nil {
return err
}
- // Fill in Name field using hash key.
+ // Fill in Name field (and ProviderType field, if not
+ // specified) using hash key.
*it = InstanceTypeMap(hash)
for name, t := range *it {
t.Name = name
+ if t.ProviderType == "" {
+ t.ProviderType = name
+ }
(*it)[name] = t
}
return nil
package ctxlog
import (
+ "bytes"
"context"
+ "io"
+ "os"
"github.com/sirupsen/logrus"
+ check "gopkg.in/check.v1"
)
var (
// Context returns a new child context such that FromContext(child)
// returns the given logger.
-func Context(ctx context.Context, logger *logrus.Entry) context.Context {
+func Context(ctx context.Context, logger logrus.FieldLogger) context.Context {
return context.WithValue(ctx, loggerCtxKey, logger)
}
// FromContext returns the logger suitable for the given context -- the one
// attached by contextWithLogger() if applicable, otherwise the
// top-level logger with no fields/values.
-func FromContext(ctx context.Context) *logrus.Entry {
+func FromContext(ctx context.Context) logrus.FieldLogger {
if ctx != nil {
- if logger, ok := ctx.Value(loggerCtxKey).(*logrus.Entry); ok {
+ if logger, ok := ctx.Value(loggerCtxKey).(logrus.FieldLogger); ok {
return logger
}
}
return rootLogger.WithFields(nil)
}
+// New returns a new logger with the indicated format and
+// level.
+func New(out io.Writer, format, level string) logrus.FieldLogger {
+ logger := logrus.New()
+ logger.Out = out
+ setFormat(logger, format)
+ setLevel(logger, level)
+ return logger
+}
+
+func TestLogger(c *check.C) logrus.FieldLogger {
+ logger := logrus.New()
+ logger.Out = &logWriter{c.Log}
+ setFormat(logger, "text")
+ if d := os.Getenv("ARVADOS_DEBUG"); d != "0" && d != "" {
+ setLevel(logger, "debug")
+ } else {
+ setLevel(logger, "info")
+ }
+ return logger
+}
+
// SetLevel sets the current logging level. See logrus for level
// names.
func SetLevel(level string) {
- lvl, err := logrus.ParseLevel(level)
- if err != nil {
- logrus.Fatal(err)
+ setLevel(rootLogger, level)
+}
+
+func setLevel(logger *logrus.Logger, level string) {
+ if level == "" {
+ } else if lvl, err := logrus.ParseLevel(level); err != nil {
+ logrus.WithField("Level", level).Fatal("unknown log level")
+ } else {
+ logger.Level = lvl
}
- rootLogger.Level = lvl
}
// SetFormat sets the current logging format to "json" or "text".
func SetFormat(format string) {
+ setFormat(rootLogger, format)
+}
+
+func setFormat(logger *logrus.Logger, format string) {
switch format {
case "text":
- rootLogger.Formatter = &logrus.TextFormatter{
+ logger.Formatter = &logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: rfc3339NanoFixed,
}
- case "json":
- rootLogger.Formatter = &logrus.JSONFormatter{
+ case "json", "":
+ logger.Formatter = &logrus.JSONFormatter{
TimestampFormat: rfc3339NanoFixed,
}
default:
- logrus.WithField("LogFormat", format).Fatal("unknown log format")
+ logrus.WithField("Format", format).Fatal("unknown log format")
}
}
+
+// logWriter is an io.Writer that writes by calling a "write log"
+// function, typically (*check.C)Log().
+type logWriter struct {
+ logfunc func(...interface{})
+}
+
+func (tl *logWriter) Write(buf []byte) (int, error) {
+ tl.logfunc(string(bytes.TrimRight(buf, "\n")))
+ return len(buf), nil
+}
}
}
arvadosRootUrl = "https://" + arvadosApiHost;
- arvadosRootUrl += (arvadosApiHost.endsWith("/")) ? "" : "/";
if (hostInsecure != null) {
arvadosApiHostInsecure = Boolean.valueOf(hostInsecure);
if streamoffset == current_span[1]:
current_span[1] += segment.segment_size
else:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+ stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
current_span = [streamoffset, streamoffset + segment.segment_size]
if current_span is not None:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+ stream_tokens.append(u"{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
if not stream[streamfile]:
- stream_tokens.append("0:0:{0}".format(fout))
+ stream_tokens.append(u"0:0:{0}".format(fout))
return stream_tokens
streampath, filename = split(streampath)
if self._last_open and not self._last_open.closed:
raise errors.AssertionError(
- "can't open '{}' when '{}' is still open".format(
+ u"can't open '{}' when '{}' is still open".format(
filename, self._last_open.name))
if streampath != self.current_stream_name():
self.start_new_stream(streampath)
writer._queued_file.seek(pos)
except IOError as error:
raise errors.StaleWriterStateError(
- "failed to reopen active file {}: {}".format(path, error))
+ u"failed to reopen active file {}: {}".format(path, error))
return writer
def check_dependencies(self):
for path, orig_stat in listitems(self._dependencies):
if not S_ISREG(orig_stat[ST_MODE]):
- raise errors.StaleWriterStateError("{} not file".format(path))
+ raise errors.StaleWriterStateError(u"{} not file".format(path))
try:
now_stat = tuple(os.stat(path))
except OSError as error:
raise errors.StaleWriterStateError(
- "failed to stat {}: {}".format(path, error))
+ u"failed to stat {}: {}".format(path, error))
if ((not S_ISREG(now_stat[ST_MODE])) or
(orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
(orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
- raise errors.StaleWriterStateError("{} changed".format(path))
+ raise errors.StaleWriterStateError(u"{} changed".format(path))
def dump_state(self, copy_func=lambda x: x):
state = {attr: copy_func(getattr(self, attr))
try:
src_path = os.path.realpath(source)
except Exception:
- raise errors.AssertionError("{} not a file path".format(source))
+ raise errors.AssertionError(u"{} not a file path".format(source))
try:
path_stat = os.stat(src_path)
except OSError as stat_error:
self._dependencies[source] = tuple(fd_stat)
elif path_stat is None:
raise errors.AssertionError(
- "could not stat {}: {}".format(source, stat_error))
+ u"could not stat {}: {}".format(source, stat_error))
elif path_stat.st_ino != fd_stat.st_ino:
raise errors.AssertionError(
- "{} changed between open and stat calls".format(source))
+ u"{} changed between open and stat calls".format(source))
else:
self._dependencies[src_path] = tuple(fd_stat)
def get_trash_at(self):
if self._api_response and self._api_response["trash_at"]:
- return ciso8601.parse_datetime(self._api_response["trash_at"])
+ try:
+ return ciso8601.parse_datetime(self._api_response["trash_at"])
+ except ValueError:
+ return None
else:
return None
import json
import os
import re
-import subprocess32 as subprocess
import sys
import tarfile
import tempfile
import shutil
import _strptime
import fcntl
-
from operator import itemgetter
from stat import *
+if os.name == "posix" and sys.version_info[0] < 3:
+ import subprocess32 as subprocess
+else:
+ import subprocess
+
import arvados
import arvados.util
import arvados.commands._util as arv_cmd
Docker metadata links to sort them from least to most preferred.
"""
try:
- image_timestamp = ciso8601.parse_datetime_unaware(
+ image_timestamp = ciso8601.parse_datetime(
link['properties']['image_timestamp'])
except (KeyError, ValueError):
image_timestamp = EARLIEST_DATETIME
- return (image_timestamp,
- ciso8601.parse_datetime_unaware(link['created_at']))
+ try:
+ created_timestamp = ciso8601.parse_datetime(link['created_at'])
+ except ValueError:
+ created_timestamp = None
+ return (image_timestamp, created_timestamp)
def _get_docker_links(api_client, num_retries, **kwargs):
links = arvados.util.list_all(api_client.links().list,
try:
fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
- raise ResumeCacheConflict("{} locked".format(fileobj.name))
+ raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
def load(self):
self.cache_file.seek(0)
raise ArvPutUploadIsPending()
self._write_stdin(self.filename or 'stdin')
elif not os.path.exists(path):
- raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
+ raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
elif os.path.isdir(path):
# Use absolute paths on cache index so CWD doesn't interfere
# with the caching logic.
elif file_in_local_collection.permission_expired():
# Permission token expired, re-upload file. This will change whenever
# we have a API for refreshing tokens.
- self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
+ self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
should_upload = True
self._local_collection.remove(filename)
elif cached_file_data['size'] == file_in_local_collection.size():
# Inconsistent cache, re-upload the file
should_upload = True
self._local_collection.remove(filename)
- self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
# Local file differs from cached data, re-upload it.
else:
if file_in_local_collection:
if self.use_cache:
cache_filepath = self._get_cache_filepath()
if self.resume and os.path.exists(cache_filepath):
- self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
+ self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'a+')
else:
# --no-resume means start with a empty cache file.
- self.logger.info("Creating new cache file at {}".format(cache_filepath))
+ self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'w+')
self._cache_filename = self._cache_file.name
self._lock_file(self._cache_file)
try:
fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
- raise ResumeCacheConflict("{} locked".format(fileobj.name))
+ raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
def _save_state(self):
"""
else:
try:
if args.update_collection:
- logger.info("Collection updated: '{}'".format(writer.collection_name()))
+ logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
else:
- logger.info("Collection saved as '{}'".format(writer.collection_name()))
+ logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
if args.portable_data_hash:
output = writer.portable_data_hash()
else:
('share/doc/arvados-python-client', ['LICENSE-2.0.txt', 'README.rst']),
],
install_requires=[
- 'ciso8601 >=1.0.6, <2.0.0',
+ 'ciso8601 >=2.0.0',
'future',
'google-api-python-client >=1.6.2, <1.7',
'httplib2 >=0.9.2',
'ruamel.yaml >=0.15.54, <=0.15.77',
'setuptools',
'ws4py >=0.4.2',
- 'subprocess32 >=3.5.1',
],
+ extras_require={
+ ':os.name=="posix" and python_version<"3"': ['subprocess32 >= 3.5.1'],
+ ':python_version<"3"': ['pytz'],
+ },
classifiers=[
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 3',
+# -*- coding: utf-8 -*-
+
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
+ def test_unicode_on_filename(self):
+ tmpdir = self.make_tmpdir()
+ fname = u"i❤arvados.txt"
+ with open(os.path.join(tmpdir, fname), 'w') as f:
+ f.write("This is a unicode named file")
+ col = self.run_and_find_collection("", ['--no-progress', tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
+
def test_silent_mode_no_errors(self):
self.authorize_with('active')
tmpdir = self.make_tmpdir()
end
def add_copy(src_item, key)
- self[key] = src_item.copy_named("#{path}/#{key}")
+ if key == "."
+ self[key] = src_item.copy_named("#{path}")
+ else
+ self[key] = src_item.copy_named("#{path}/#{key}")
+ end
end
def merge(src_item, key)
items["."] = CollectionStream.new(".")
end
+ def add_copy(src_item, key)
+ items["."].add_copy(src_item, key)
+ end
+
def raise_root_write_error(key)
raise ArgumentError.new("can't write to %p at collection root" % key)
end
dst_coll.manifest_text)
end
+ def test_copy_root_into_empty_collection
+ block = random_block(8)
+ src_coll = Arv::Collection.new(". #{block} 0:8:f1\n")
+ dst_coll = Arv::Collection.new()
+ dst_coll.cp_r("./", ".", src_coll)
+ assert_equal(". %s 0:8:f1\n" %
+ [block],
+ dst_coll.manifest_text)
+ end
+
def test_copy_empty_source_path_raises_ArgumentError(src="", dst="./s1")
coll = Arv::Collection.new(SIMPLEST_MANIFEST)
assert_raises(ArgumentError) do
# Generated git-commit.version file
/git-commit.version
+
+# Generated when building distribution packages
+/package-build.version
else
kwargs = {}
end
+ if users_list.select { |u| u.is_admin }.any?
+ return super
+ end
Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").exists)
end
assert_equal 1, Container.readable_by(users(:active)).where(state: "Queued").count
end
+ test "Containers with no matching request are readable by admin" do
+ uuids = Container.includes('container_requests').where(container_requests: {uuid: nil}).collect(&:uuid)
+ assert_not_empty uuids
+ assert_empty Container.readable_by(users(:active)).where(uuid: uuids)
+ assert_not_empty Container.readable_by(users(:admin)).where(uuid: uuids)
+ assert_equal uuids.count, Container.readable_by(users(:admin)).where(uuid: uuids).count
+ end
+
test "Container locked cancel" do
set_user_from_auth :active
c, _ = minimal_new
"encoding/json"
"fmt"
"io"
- "io/ioutil"
"os"
"os/exec"
"path/filepath"
// procinfo is saved in each process's lockfile.
type procinfo struct {
- UUID string
- PID int
- Stdout string
- Stderr string
+ UUID string
+ PID int
}
// Detach acquires a lock for the given uuid, and starts the current
// program as a child process (with -no-detach prepended to the given
// arguments so the child knows not to detach again). The lock is
// passed along to the child process.
+//
+// Stdout and stderr in the child process are sent to the systemd
+// journal using the systemd-cat program.
func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
return exitcode(stderr, detach(uuid, args, stdout, stderr))
}
return nil, err
}
defer dirlock.Close()
- lockfile, err := os.OpenFile(filepath.Join(lockdir, lockprefix+uuid+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+ lockfilename := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
+ lockfile, err := os.OpenFile(lockfilename, os.O_CREATE|os.O_RDWR, 0700)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("open %s: %s", lockfilename, err)
}
err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
lockfile.Close()
- return nil, err
+ return nil, fmt.Errorf("lock %s: %s", lockfilename, err)
}
return lockfile, nil
}()
defer lockfile.Close()
lockfile.Truncate(0)
- outfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stdout-")
- if err != nil {
- return err
- }
- defer outfile.Close()
- errfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stderr-")
- if err != nil {
- os.Remove(outfile.Name())
- return err
- }
- defer errfile.Close()
-
- cmd := exec.Command(args[0], append([]string{"-no-detach"}, args[1:]...)...)
- cmd.Stdout = outfile
- cmd.Stderr = errfile
+ cmd := exec.Command("systemd-cat", append([]string{"--identifier=crunch-run", args[0], "-no-detach"}, args[1:]...)...)
// Child inherits lockfile.
cmd.ExtraFiles = []*os.File{lockfile}
// Ensure child isn't interrupted even if we receive signals
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
err = cmd.Start()
if err != nil {
- os.Remove(outfile.Name())
- os.Remove(errfile.Name())
- return err
+ return fmt.Errorf("exec %s: %s", cmd.Path, err)
}
w := io.MultiWriter(stdout, lockfile)
- err = json.NewEncoder(w).Encode(procinfo{
- UUID: uuid,
- PID: cmd.Process.Pid,
- Stdout: outfile.Name(),
- Stderr: errfile.Name(),
+ return json.NewEncoder(w).Encode(procinfo{
+ UUID: uuid,
+ PID: cmd.Process.Pid,
})
- if err != nil {
- os.Remove(outfile.Name())
- os.Remove(errfile.Name())
- return err
- }
- return nil
}
// KillProcess finds the crunch-run process corresponding to the given
if os.IsNotExist(err) {
return nil
} else if err != nil {
- return err
+ return fmt.Errorf("open %s: %s", path, err)
}
defer f.Close()
var pi procinfo
err = json.NewDecoder(f).Decode(&pi)
if err != nil {
- return fmt.Errorf("%s: %s\n", path, err)
+ return fmt.Errorf("decode %s: %s\n", path, err)
}
if pi.UUID != uuid || pi.PID == 0 {
proc, err := os.FindProcess(pi.PID)
if err != nil {
- return err
+ return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
}
err = proc.Signal(signal)
err = proc.Signal(syscall.Signal(0))
}
if err == nil {
- return fmt.Errorf("pid %d: sent signal %d (%s) but process is still alive", pi.PID, signal, signal)
+ return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
}
- fmt.Fprintf(stderr, "pid %d: %s\n", pi.PID, err)
+ fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
return nil
}
// List UUIDs of active crunch-run processes.
func ListProcesses(stdout, stderr io.Writer) int {
- return exitcode(stderr, filepath.Walk(lockdir, func(path string, info os.FileInfo, err error) error {
- if info.IsDir() {
+ // filepath.Walk does not follow symlinks, so we must walk
+ // lockdir+"/." in case lockdir itself is a symlink.
+ walkdir := lockdir + "/."
+ return exitcode(stderr, filepath.Walk(walkdir, func(path string, info os.FileInfo, err error) error {
+ if info.IsDir() && path != walkdir {
return filepath.SkipDir
}
if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
err := os.Remove(path)
dirlock.Close()
if err != nil {
- fmt.Fprintln(stderr, err)
+ fmt.Fprintf(stderr, "unlink %s: %s\n", f.Name(), err)
}
return nil
}
//
// Caller releases the lock by closing the returned file.
func lockall() (*os.File, error) {
- f, err := os.OpenFile(filepath.Join(lockdir, lockprefix+"all"+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+ lockfile := filepath.Join(lockdir, lockprefix+"all"+locksuffix)
+ f, err := os.OpenFile(lockfile, os.O_CREATE|os.O_RDWR, 0700)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("open %s: %s", lockfile, err)
}
err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
if err != nil {
f.Close()
- return nil, err
+ return nil, fmt.Errorf("lock %s: %s", lockfile, err)
}
return f, nil
}
cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+ stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
flag.Parse()
+ if *stdinEnv && !ignoreDetachFlag {
+ // Load env vars on stdin if asked (but not in a
+ // detached child process, in which case stdin is
+ // /dev/null).
+ loadEnv(os.Stdin)
+ }
+
switch {
case *detach && !ignoreDetachFlag:
os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
log.Fatalf("%s: %v", containerId, runerr)
}
}
+
+func loadEnv(rdr io.Reader) {
+ buf, err := ioutil.ReadAll(rdr)
+ if err != nil {
+ log.Fatalf("read stdin: %s", err)
+ }
+ var env map[string]string
+ err = json.Unmarshal(buf, &env)
+ if err != nil {
+ log.Fatalf("decode stdin: %s", err)
+ }
+ for k, v := range env {
+ err = os.Setenv(k, v)
+ if err != nil {
+ log.Fatalf("setenv(%q): %s", k, err)
+ }
+ }
+}
if not t:
return 0
try:
- return calendar.timegm(ciso8601.parse_datetime_unaware(t).timetuple())
+ return calendar.timegm(ciso8601.parse_datetime(t).timetuple())
except (TypeError, ValueError):
return 0
# llfuse 1.3.4 fails to install via pip
'llfuse >=1.2, <1.3.4',
'python-daemon',
- 'ciso8601 >=1.0.6, <2.0.0',
+ 'ciso8601 >= 2.0.0',
'setuptools'
],
+ extras_require={
+ ':python_version<"3"': ['pytz'],
+ },
test_suite='tests',
tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
zip_safe=False
switch {
case err == nil:
return err
+ case strings.Contains(err.Error(), "StatusCode=503"):
+ // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
+ return VolumeBusyError
case strings.Contains(err.Error(), "Not Found"):
// "storage: service returned without a response body (404 Not Found)"
return os.ErrNotExist
// - permissions on, authenticated request, unsigned locator
// - permissions on, unauthenticated request, signed locator
// - permissions on, authenticated request, expired locator
+// - permissions on, authenticated request, signed locator, transient error from backend
//
func TestGetHandler(t *testing.T) {
defer teardown()
ExpectStatusCode(t,
"Authenticated request, expired locator",
ExpiredError.HTTPCode, response)
+
+ // Authenticated request, signed locator
+ // => 503 Server busy (transient error)
+
+ // Set up the block owning volume to respond with errors
+ vols[0].(*MockVolume).Bad = true
+ vols[0].(*MockVolume).BadVolumeError = VolumeBusyError
+ response = IssueRequest(&RequestTester{
+ method: "GET",
+ uri: signedLocator,
+ apiToken: knownToken,
+ })
+ // A transient error from one volume while the other doesn't find the block
+ // should make the service return a 503 so that clients can retry.
+ ExpectStatusCode(t,
+ "Volume backend busy",
+ 503, response)
}
// Test PutBlockHandler on the following situations:
if !os.IsNotExist(err) {
log.Printf("%s: Get(%s): %s", vol, hash, err)
}
+ // If some volume returns a transient error, return it to the caller
+ // instead of "Not found" so it can retry.
+ if err == VolumeBusyError {
+ errorToCaller = err.(*KeepError)
+ }
continue
}
// Check the file checksum.
DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
ExpiredError = &KeepError{401, "Expired permission signature"}
NotFoundError = &KeepError{404, "Not Found"}
+ VolumeBusyError = &KeepError{503, "Volume backend busy"}
GenericError = &KeepError{500, "Fail"}
FullError = &KeepError{503, "Full"}
SizeRequiredError = &KeepError{411, "Missing Content-Length"}
import (
"bytes"
"context"
+ "errors"
"fmt"
"io/ioutil"
"os"
vols := KeepVM.AllWritable()
vols[0].(*MockVolume).Bad = true
+ vols[0].(*MockVolume).BadVolumeError = errors.New("Bad volume")
// Check that PutBlock stores the data as expected.
if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
Timestamps map[string]time.Time
// Bad volumes return an error for every operation.
- Bad bool
+ Bad bool
+ BadVolumeError error
// Touchable volumes' Touch() method succeeds for a locator
// that has been Put().
v.gotCall("Compare")
<-v.Gate
if v.Bad {
- return errors.New("Bad volume")
+ return v.BadVolumeError
} else if block, ok := v.Store[loc]; ok {
if fmt.Sprintf("%x", md5.Sum(block)) != loc {
return DiskHashError
v.gotCall("Get")
<-v.Gate
if v.Bad {
- return 0, errors.New("Bad volume")
+ return 0, v.BadVolumeError
} else if block, ok := v.Store[loc]; ok {
copy(buf[:len(block)], block)
return len(block), nil
v.gotCall("Put")
<-v.Gate
if v.Bad {
- return errors.New("Bad volume")
+ return v.BadVolumeError
}
if v.Readonly {
return MethodDisabledError
var mtime time.Time
var err error
if v.Bad {
- err = errors.New("Bad volume")
+ err = v.BadVolumeError
} else if t, ok := v.Timestamps[loc]; ok {
mtime = t
} else {
permChecker permChecker
subscriptions []v0subscribe
lastMsgID uint64
- log *logrus.Entry
+ log logrus.FieldLogger
mtx sync.Mutex
setupOnce sync.Once
}
arvados_docsite: http://$localip:${services[doc]}/
force_ssl: false
composer_url: http://$localip:${services[composer]}
+ workbench2_url: https://$localip:${services[workbench2-ssl]}
EOF
bundle exec rake assets:precompile