assert_text 'created_at'
if cancelable
- assert_text 'priority: 1' if type.include?('container')
+ assert_text 'priority: 501' if type.include?('container')
if type.include?('pipeline')
assert_selector 'a', text: 'Pause'
first('a,link', text: 'Pause').click
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|rsa|3.4.2|2|python|all
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|uritemplate|3.0.0|2|python|all
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|httplib2|0.9.2|3|python|all
-debian8,debian9,centos7|ws4py|0.4.2|2|python|all
+debian8,debian9,centos7,ubuntu1404,ubuntu1604|ws4py|0.4.2|2|python|all
debian8,debian9,centos7|pykka|1.2.1|2|python|all
debian8,debian9,ubuntu1404,centos7|six|1.10.0|2|python|all
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|ciso8601|1.0.6|3|python|amd64
centos7|psutil|5.0.1|0|python|all
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|lockfile|0.12.2|2|python|all|--epoch 1
debian8,debian9,ubuntu1404,ubuntu1604,ubuntu1804,centos7|subprocess32|3.5.1|2|python|all
-centos7,debian8,debian9,ubuntu1404,ubuntu1604|ruamel.yaml|0.15.34|1|python|amd64|--python-setup-py-arguments --single-version-externally-managed
+all|ruamel.yaml|0.14.12|2|python|amd64|--python-setup-py-arguments --single-version-externally-managed
all|cwltest|1.0.20180518074130|4|python|all|--depends 'python-futures >= 3.0.5' --depends 'python-subprocess32 >= 3.5.0'
all|junit-xml|1.8|3|python|all
all|rdflib-jsonld|0.4.0|2|python|all
RUN yum -q -y install make automake gcc gcc-c++ libyaml-devel patch readline-devel zlib-devel libffi-devel openssl-devel bzip2 libtool bison sqlite-devel rpm-build git perl-ExtUtils-MakeMaker libattr-devel nss-devel libcurl-devel which tar unzip scl-utils centos-release-scl postgresql-devel python-devel python-setuptools fuse-devel xz-libs git
# Install RVM
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+RUN gpg --keyserver ha.pool.sks-keyservers.net --recv-keys D39DC0E3 && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
/usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip
# Install RVM
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+RUN gpg --keyserver ha.pool.sks-keyservers.net --recv-keys D39DC0E3 && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
/usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip
# Install RVM
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+RUN gpg --keyserver ha.pool.sks-keyservers.net --recv-keys D39DC0E3 && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
/usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev libgnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip tzdata
# Install RVM
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+RUN gpg --keyserver ha.pool.sks-keyservers.net --recv-keys D39DC0E3 && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
/usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-pip libcurl4-gnutls-dev libgnutls28-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip tzdata
# Install RVM
-RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+RUN gpg --keyserver ha.pool.sks-keyservers.net --recv-keys D39DC0E3 && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
/usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
# Install RVM
RUN touch /var/lib/rpm/* && \
- gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+ gpg --keyserver ha.pool.sks-keyservers.net --recv-keys D39DC0E3 && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
/usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
# Install RVM
RUN apt-get update && \
apt-get -y install --no-install-recommends curl ca-certificates && \
- gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+ gpg --keyserver ha.pool.sks-keyservers.net --recv-keys D39DC0E3 && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
/usr/local/rvm/bin/rvm alias create default ruby-2.3
# Install dependencies and RVM
RUN apt-get update && \
apt-get -y install --no-install-recommends curl ca-certificates python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip binutils build-essential ca-certificates && \
- gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+ gpg --keyserver ha.pool.sks-keyservers.net --recv-keys D39DC0E3 && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
/usr/local/rvm/bin/rvm alias create default ruby-2.3
# Install RVM
RUN apt-get update && \
apt-get -y install --no-install-recommends curl ca-certificates && \
- gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
+ gpg --keyserver ha.pool.sks-keyservers.net --recv-keys D39DC0E3 && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
/usr/local/rvm/bin/rvm alias create default ruby-2.3
declare -a failures
declare -A skip
+declare -A only
declare -A testargs
skip[apps/workbench_profile]=1
# nodemanager_integration tests are not reliable, see #12061.
skip[$1]=1; shift
;;
--only)
- only="$1"; skip[$1]=""; shift
+ only[$1]=1; skip[$1]=""; shift
;;
--short)
short=1
# required when testing it. Skip that step if it is not needed.
NEED_SDK_R=true
-if [[ ! -z "${only}" && "${only}" != "sdk/R" ]]; then
+if [[ ${#only[@]} -ne 0 ]] &&
+ [[ -z "${only['sdk/R']}" && -z "${only['doc']}" ]]; then
NEED_SDK_R=false
fi
-if [[ ! -z "${skip}" && "${skip}" == "sdk/R" ]]; then
+if [[ ${skip["sdk/R"]} == 1 && ${skip["doc"]} == 1 ]]; then
NEED_SDK_R=false
fi
+if [[ $NEED_SDK_R == false ]]; then
+ echo "R SDK not needed, it will not be installed."
+fi
+
start_services() {
echo 'Starting API, keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
if [[ ! -d "$WORKSPACE/services/api/log" ]]; then
;;
esac
if [[ -z "${skip[$suite]}" && -z "${skip[$1]}" && \
- (-z "${only}" || "${only}" == "${suite}" || \
- "${only}" == "${1}") ||
- "${only}" == "${2}" ]]; then
+ (${#only[@]} -eq 0 || ${only[$suite]} -eq 1 || \
+ ${only[$1]} -eq 1) ||
+ ${only[$2]} -eq 1 ]]; then
retry do_test_once ${@}
else
title "Skipping ${1} tests"
Here we create a default project for the standard Arvados Docker images, and give all users read access to it. The project is owned by the system user.
<notextile>
-<pre><code>~$ <span class="userinput">project_uuid=`arv --format=uuid group create --group "{\"owner_uuid\":\"$prefix-tpzed-000000000000000\", \"name\":\"Arvados Standard Docker Images\"}"`</span>
+<pre><code>~$ <span class="userinput">uuid_prefix=`arv --format=uuid user current | cut -d- -f1`</span>
+~$ <span class="userinput">project_uuid=`arv --format=uuid group create --group "{\"owner_uuid\":\"$uuid_prefix-tpzed-000000000000000\", \"name\":\"Arvados Standard Docker Images\"}"`</span>
~$ <span class="userinput">echo "Arvados project uuid is '$project_uuid'"</span>
~$ <span class="userinput">read -rd $'\000' newlink <<EOF; arv link create --link "$newlink"</span>
<span class="userinput">{
Here we create a repository object which will be used to set up a hosted clone of the arvados repository on this cluster.
<notextile>
-<pre><code>~$ <span class="userinput">prefix=`arv --format=uuid user current | cut -d- -f1`</span>
-~$ <span class="userinput">echo "Site prefix is '$prefix'"</span>
-~$ <span class="userinput">all_users_group_uuid="$prefix-j7d0g-fffffffffffffff"</span>
-~$ <span class="userinput">repo_uuid=`arv --format=uuid repository create --repository "{\"owner_uuid\":\"$prefix-tpzed-000000000000000\", \"name\":\"arvados\"}"`</span>
+<pre><code>~$ <span class="userinput">uuid_prefix=`arv --format=uuid user current | cut -d- -f1`</span>
+~$ <span class="userinput">echo "Site prefix is '$uuid_prefix'"</span>
+~$ <span class="userinput">all_users_group_uuid="$uuid_prefix-j7d0g-fffffffffffffff"</span>
+~$ <span class="userinput">repo_uuid=`arv --format=uuid repository create --repository "{\"owner_uuid\":\"$uuid_prefix-tpzed-000000000000000\", \"name\":\"arvados\"}"`</span>
~$ <span class="userinput">echo "Arvados repository uuid is '$repo_uuid'"</span>
</code></pre></notextile>
Use this command to register each keepstore server you have installed. Make sure to update the @service_host@ value.
<notextile>
-<pre><code>~$ <span class="userinput">prefix=`arv --format=uuid user current | cut -d- -f1`</span>
-~$ <span class="userinput">echo "Site prefix is '$prefix'"</span>
+<pre><code>~$ <span class="userinput">uuid_prefix=`arv --format=uuid user current | cut -d- -f1`</span>
+~$ <span class="userinput">echo "Site prefix is '$uuid_prefix'"</span>
~$ <span class="userinput">read -rd $'\000' keepservice <<EOF; arv keep_service create --keep-service "$keepservice"</span>
<span class="userinput">{
"service_host":"<strong>keep0.$uuid_prefix.your.domain</strong>",
s.executables << "arv-crunch-job"
s.executables << "arv-tag"
s.required_ruby_version = '>= 2.1.0'
- s.add_runtime_dependency 'arvados', '~> 1.1.0', '>= 1.1.4'
+ s.add_runtime_dependency 'arvados', '~> 1.2.0', '>= 1.2.0'
# Our google-api-client dependency used to be < 0.9, but that could be
# satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
- s.add_runtime_dependency 'google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
+ s.add_runtime_dependency 'cure-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5'
s.add_runtime_dependency 'json', '>= 1.7.7', '<3'
- s.add_runtime_dependency 'trollop', '~> 2.0'
+ s.add_runtime_dependency 'optimist', '~> 3.0'
s.add_runtime_dependency 'andand', '~> 1.3', '>= 1.3.3'
s.add_runtime_dependency 'oj', '~> 3.0'
s.add_runtime_dependency 'curb', '~> 0.8'
require 'andand'
require 'curb'
require 'oj'
- require 'trollop'
+ require 'optimist'
rescue LoadError => error
abort <<-EOS
Please install all required gems:
- gem install arvados activesupport andand curb json oj trollop
+ gem install arvados activesupport andand curb json oj optimist
EOS
end
def arv_create client, arvados, global_opts, remaining_opts
types = resource_types(arvados.discovery_document)
- create_opts = Trollop::options do
+ create_opts = Optimist::options do
opt :project_uuid, "Project uuid in which to create the object", :type => :string
stop_on resource_types(arvados.discovery_document)
end
rsc = rsc.first
discovered_params = arvados.discovery_document["resources"][rsc]["methods"]["create"]["parameters"]
- method_opts = Trollop::options do
+ method_opts = Optimist::options do
banner head_banner
banner "Usage: arv create [--project-uuid] #{object_type} [create parameters]"
banner ""
opts = Hash.new()
opts[:type] = v["type"].to_sym if v.include?("type")
if [:datetime, :text, :object, :array].index opts[:type]
- opts[:type] = :string # else trollop bork
+ opts[:type] = :string # else optimist bork
end
opts[:default] = v["default"] if v.include?("default")
opts[:default] = v["default"].to_i if opts[:type] == :integer
def parse_arguments(discovery_document, subcommands)
resources_and_subcommands = resource_types(discovery_document) + subcommands
- option_parser = Trollop::Parser.new do
+ option_parser = Optimist::Parser.new do
version __FILE__
banner head_banner
banner "Usage: arv [--flags] subcommand|resource [method] [--parameters]"
stop_on resources_and_subcommands
end
- global_opts = Trollop::with_standard_exception_handling option_parser do
+ global_opts = Optimist::with_standard_exception_handling option_parser do
o = option_parser.parse ARGV
end
discovered_params = discovery_document\
["resources"][resource.pluralize]\
["methods"][method]["parameters"]
- method_opts = Trollop::options do
+ method_opts = Optimist::options do
banner head_banner
banner "Usage: arv #{resource} #{method} [--parameters]"
banner ""
opts = Hash.new()
opts[:type] = v["type"].to_sym if v.include?("type")
if [:datetime, :text, :object, :array].index opts[:type]
- opts[:type] = :string # else trollop bork
+ opts[:type] = :string # else optimist bork
end
opts[:default] = v["default"] if v.include?("default")
opts[:default] = v["default"].to_i if opts[:type] == :integer
elsif resource_body_is_readable_file
resource_body = resource_body_file.read()
begin
- # we don't actually need the results of the parsing,
+ # we don't actually need the results of the parsing,
# just checking for the JSON::ParserError exception
JSON.parse resource_body
rescue JSON::ParserError => e
require 'rubygems'
require 'json'
require 'pp'
- require 'trollop'
+ require 'optimist'
require 'google/api_client'
rescue LoadError => l
$stderr.puts $:
abort <<-EOS
#{$0}: fatal: #{l.message}
Some runtime dependencies may be missing.
-Try: gem install arvados pp google-api-client json trollop
+Try: gem install arvados pp google-api-client json optimist
EOS
end
# Parse command line options (the kind that control the behavior of
# this program, that is, not the pipeline component parameters).
-p = Trollop::Parser.new do
+p = Optimist::Parser.new do
version __FILE__
banner(<<EOF)
type: :string)
stop_on [:'--']
end
-$options = Trollop::with_standard_exception_handling p do
+$options = Optimist::with_standard_exception_handling p do
p.parse ARGV
end
$debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
require 'json'
require 'pp'
require 'oj'
- require 'trollop'
+ require 'optimist'
rescue LoadError
abort <<-EOS
#{$0}: fatal: some runtime dependencies are missing.
-Try: gem install pp google-api-client json trollop
+Try: gem install pp google-api-client json optimist
EOS
end
end
end
-global_opts = Trollop::options do
+global_opts = Optimist::options do
banner usage_string
banner ""
opt :dry_run, "Don't actually do anything", :short => "-n"
stop_on ['add', 'remove']
end
-p = Trollop::Parser.new do
+p = Optimist::Parser.new do
opt(:all,
"Remove this tag from all objects under your ownership. Only valid with `tag remove'.",
:short => :none)
:short => :o)
end
-$options = Trollop::with_standard_exception_handling p do
+$options = Optimist::with_standard_exception_handling p do
p.parse ARGV
end
'cwltool==1.0.20180806194258',
'schema-salad==2.7.20180719125426',
'typing >= 3.6.4',
- 'ruamel.yaml >=0.13.11, <0.16',
+ # Need to limit ruamel.yaml version to 0.15.26 because of bug
+ # https://bitbucket.org/ruamel/yaml/issues/227/regression-parsing-flow-mapping
+ 'ruamel.yaml >=0.13.11, <= 0.15.26',
'arvados-python-client>=1.1.4.20180607143841',
'setuptools',
'ciso8601 >=1.0.6, <2.0.0',
import arvados_cwl
import arvados_cwl.context
from arvados_cwl.arvdocker import arv_docker_clear_cache
+import arvados.config
import logging
import mock
import unittest
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+class CollectionMock(object):
+ def __init__(self, vwdmock, *args, **kwargs):
+ self.vwdmock = vwdmock
+ self.count = 0
+
+ def open(self, *args, **kwargs):
+ self.count += 1
+ return self.vwdmock.open(*args, **kwargs)
+
+ def copy(self, *args, **kwargs):
+ self.count += 1
+ self.vwdmock.copy(*args, **kwargs)
+
+ def save_new(self, *args, **kwargs):
+ pass
+
+ def __len__(self):
+ return self.count
+
+ def portable_data_hash(self):
+ if self.count == 0:
+ return arvados.config.EMPTY_BLOCK_LOCATOR
+ else:
+ return "99999999999999999999999999999996+99"
+
+
class TestContainer(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
runner.fs_access.get_collection.side_effect = get_collection_mock
vwdmock = mock.MagicMock()
- collection_mock.return_value = vwdmock
- vwdmock.portable_data_hash.return_value = "99999999999999999999999999999996+99"
+ collection_mock.side_effect = lambda *args, **kwargs: CollectionMock(vwdmock, *args, **kwargs)
tool = cmap({
"inputs": [],
from schema_salad.sourceline import cmap
from .mock_discovery import get_rootDesc
from .matcher import JsonDiffMatcher, StripYAMLComments
+from .test_container import CollectionMock
if not os.getenv('ARVADOS_DEBUG'):
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
- mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+ mockc = mock.MagicMock()
+ mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
'HOME': '$(task.outdir)',
'TMPDIR': '$(task.tmpdir)'},
'task.vwd': {
- 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
- 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
+ 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
+ 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
},
'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
'task.stdout': 'cwl.output.json'}]},
['docker_image_locator', 'in docker', 'arvados/jobs']],
find_or_create=True)
- mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
- mockcollection().open().__enter__().write.assert_has_calls([mock.call(
+ mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
+ mockc.open().__enter__().write.assert_has_calls([mock.call(
'''{
"fileblub": {
"basename": "token.txt",
tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
- mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+ mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
arvtool.formatgraph = None
'HOME': '$(task.outdir)',
'TMPDIR': '$(task.tmpdir)'},
'task.vwd': {
- 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
- 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
+ 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
+ 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
},
'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
'task.stdout': 'cwl.output.json'}]},
'manifest_text':
'. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'replication_desired': None,
- 'name': 'submit_tool.cwl dependencies',
- }), ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+ }), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl input',
- }), ensure_unique_name=True),
+ 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
+ }), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 61df2ed9ee3eb7dd9b799e5ca35305fa+1217 0:1217:workflow.cwl\n',
'manifest_text':
'. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'replication_desired': None,
- 'name': 'submit_tool.cwl dependencies',
- }), ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+ }), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl input',
- }), ensure_unique_name=True)])
+ 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
+ }), ensure_unique_name=False)])
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
ApiServer: c.APIHost,
ApiToken: c.AuthToken,
ApiInsecure: c.Insecure,
- Client: &http.Client{Transport: &http.Transport{
- TLSClientConfig: MakeTLSConfig(c.Insecure)}},
+ Client: &http.Client{
+ Timeout: 5 * time.Minute,
+ Transport: &http.Transport{
+ TLSClientConfig: MakeTLSConfig(c.Insecure)},
+ },
External: false,
Retries: 2,
KeepServiceURIs: c.KeepServiceURIs,
import (
"context"
"fmt"
- "log"
"sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "github.com/Sirupsen/logrus"
)
const (
Cancelled = arvados.ContainerStateCancelled
)
+type Logger interface {
+ Printf(string, ...interface{})
+ Warnf(string, ...interface{})
+ Debugf(string, ...interface{})
+}
+
// Dispatcher struct
type Dispatcher struct {
Arv *arvadosclient.ArvadosClient
+ Logger Logger
+
// Batch size for container queries
BatchSize int64
// dispatcher's token. When a new one appears, Run calls RunContainer
// in a new goroutine.
func (d *Dispatcher) Run(ctx context.Context) error {
+ if d.Logger == nil {
+ d.Logger = logrus.StandardLogger()
+ }
+
err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
if err != nil {
return fmt.Errorf("error getting my token UUID: %v", err)
// Containers that I know about that didn't show up in any
// query should be let go.
for uuid, tracker := range todo {
- log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+ d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
tracker.close()
}
// Start a runner in a new goroutine, and send the initial container
// record to its updates channel.
func (d *Dispatcher) start(c arvados.Container) *runTracker {
- tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+ tracker := &runTracker{
+ updates: make(chan arvados.Container, 1),
+ logger: d.Logger,
+ }
tracker.updates <- c
go func() {
d.RunContainer(d, c, tracker.updates)
"order": []string{"priority desc"}}
err := d.Arv.List("containers", params, &countList)
if err != nil {
- log.Printf("error getting count of containers: %q", err)
+ d.Logger.Warnf("error getting count of containers: %q", err)
return false
}
itemsAvailable := countList.ItemsAvailable
err := d.Arv.List("containers", params, &list)
if err != nil {
- log.Printf("Error getting list of containers: %q", err)
+ d.Logger.Warnf("error getting list of containers: %q", err)
return false
}
d.checkListForUpdates(list.Items, todo)
delete(todo, c.UUID)
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
- log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+ d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
} else if alreadyTracking {
switch c.State {
case Queued:
}
err := d.lock(c.UUID)
if err != nil {
- log.Printf("debug: error locking container %s: %s", c.UUID, err)
+ d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
break
}
c.State = Locked
"container": arvadosclient.Dict{"state": state},
}, nil)
if err != nil {
- log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
+ d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
}
return err
}
type runTracker struct {
closing bool
updates chan arvados.Container
+ logger Logger
}
func (tracker *runTracker) close() {
}
select {
case <-tracker.updates:
- log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+ tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
default:
}
tracker.updates <- c
# Copyright (C) The Arvados Authors. All rights reserved.
+# Copyright (C) 2018 Genome Research Ltd.
#
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
from __future__ import print_function
from __future__ import absolute_import
import errno
import arvados.commands._util as arv_cmd
import arvados.collection
+import arvados.config as config
from arvados._version import __version__
for src in iterfiles:
write_file(collection, pathprefix, os.path.join(root, src), not packed)
- filters=[["portable_data_hash", "=", collection.portable_data_hash()]]
- if name:
- filters.append(["name", "like", name+"%"])
- if project:
- filters.append(["owner_uuid", "=", project])
-
- exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
-
- if exists["items"]:
- item = exists["items"][0]
- pdh = item["portable_data_hash"]
- logger.info("Using collection %s (%s)", pdh, item["uuid"])
- elif len(collection) > 0:
- collection.save_new(name=name, owner_uuid=project, ensure_unique_name=True)
+ pdh = None
+ if len(collection) > 0:
+ # non-empty collection
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
+ name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
+ if name:
+ filters.append(["name", "=", name_pdh])
+ if project:
+ filters.append(["owner_uuid", "=", project])
+
+ # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
+ # and there is a potential race with other workflows that may have created the collection
+ # between when we list it and find it does not exist and when we attempt to create it.
+ tries = 2
+ while pdh is None and tries > 0:
+ exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
+
+ if exists["items"]:
+ item = exists["items"][0]
+ pdh = item["portable_data_hash"]
+ logger.info("Using collection %s (%s)", pdh, item["uuid"])
+ else:
+ try:
+ collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False)
+ pdh = collection.portable_data_hash()
+ logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
+ except arvados.errors.ApiError as ae:
+ tries -= 1
+ if pdh is None:
+ # Something weird going on here, probably a collection
+ # with a conflicting name but wrong PDH. We won't
+ # able to reuse it but we still need to save our
+ # collection, so so save it with unique name.
+ logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh)
+ collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True)
+ pdh = collection.portable_data_hash()
+ logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
+ else:
+ # empty collection
pdh = collection.portable_data_hash()
- logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
+ assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh
+ logger.info("Using empty collection %s", pdh)
for c in files:
c.keepref = "%s/%s" % (pdh, c.fn)
'google-api-python-client >=1.6.2, <1.7',
'httplib2 >=0.9.2',
'pycurl >=7.19.5.1',
- 'ruamel.yaml >=0.13.11, <0.16',
+ 'ruamel.yaml >=0.13.11, <= 0.15.26',
'setuptools',
'ws4py >=0.4.2',
'subprocess32 >=3.5.1',
f.write("""
Clusters:
zzzzz:
+ HTTPRequestTimeout: 30s
PostgreSQL:
ConnectionPool: 32
Connection:
s.add_dependency('andand', '~> 1.3', '>= 1.3.3')
# Our google-api-client dependency used to be < 0.9, but that could be
# satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
- s.add_dependency('google-api-client', '>= 0.7', '< 0.8.9')
+ s.add_dependency('cure-google-api-client', '>= 0.7', '< 0.8.9')
# work around undeclared dependency on i18n in some activesupport 3.x.x:
s.add_dependency('i18n', '~> 0')
s.add_dependency('json', '>= 1.7.7', '<3')
end
end
- def self.where_serialized(colname, value)
+ def self.where_serialized(colname, value, md5: false)
+ colsql = colname.to_s
+ if md5
+ colsql = "md5(#{colsql})"
+ end
if value.empty?
# rails4 stores as null, rails3 stored as serialized [] or {}
- sql = "#{colname.to_s} is null or #{colname.to_s} IN (?)"
+ sql = "#{colsql} is null or #{colsql} IN (?)"
sorted = value
else
- sql = "#{colname.to_s} IN (?)"
+ sql = "#{colsql} IN (?)"
sorted = deep_sort_hash(value)
end
- where(sql, [sorted.to_yaml, SafeJSON.dump(sorted)])
+ params = [sorted.to_yaml, SafeJSON.dump(sorted)]
+ if md5
+ params = params.map { |x| Digest::MD5.hexdigest(x) }
+ end
+ where(sql, params)
end
Serializer = {
def self.find_reusable(attrs)
log_reuse_info { "starting with #{Container.all.count} container records in database" }
- candidates = Container.where_serialized(:command, attrs[:command])
+ candidates = Container.where_serialized(:command, attrs[:command], md5: true)
log_reuse_info(candidates) { "after filtering on command #{attrs[:command].inspect}" }
candidates = candidates.where('cwd = ?', attrs[:cwd])
log_reuse_info(candidates) { "after filtering on cwd #{attrs[:cwd].inspect}" }
- candidates = candidates.where_serialized(:environment, attrs[:environment])
+ candidates = candidates.where_serialized(:environment, attrs[:environment], md5: true)
log_reuse_info(candidates) { "after filtering on environment #{attrs[:environment].inspect}" }
candidates = candidates.where('output_path = ?', attrs[:output_path])
candidates = candidates.where('container_image = ?', image)
log_reuse_info(candidates) { "after filtering on container_image #{image.inspect} (resolved from #{attrs[:container_image].inspect})" }
- candidates = candidates.where_serialized(:mounts, resolve_mounts(attrs[:mounts]))
+ candidates = candidates.where_serialized(:mounts, resolve_mounts(attrs[:mounts]), md5: true)
log_reuse_info(candidates) { "after filtering on mounts #{attrs[:mounts].inspect}" }
- candidates = candidates.where('secret_mounts_md5 = ?', Digest::MD5.hexdigest(SafeJSON.dump(self.deep_sort_hash(attrs[:secret_mounts]))))
- log_reuse_info(candidates) { "after filtering on mounts #{attrs[:mounts].inspect}" }
+ secret_mounts_md5 = Digest::MD5.hexdigest(SafeJSON.dump(self.deep_sort_hash(attrs[:secret_mounts])))
+ candidates = candidates.where('secret_mounts_md5 = ?', secret_mounts_md5)
+ log_reuse_info(candidates) { "after filtering on secret_mounts_md5 #{secret_mounts_md5.inspect}" }
- candidates = candidates.where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints]))
+ candidates = candidates.where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints]), md5: true)
log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" }
log_reuse_info { "checking for state=Complete with readable output and log..." }
c = get_requesting_container()
if !c.nil?
self.requesting_container_uuid = c.uuid
- self.priority = c.priority>0 ? 1 : 0
+ # Determine the priority of container request for the requesting
+ # container.
+ self.priority = ContainerRequest.
+ where('container_uuid=? and priority>0', self.requesting_container_uuid).
+ map do |cr|
+ cr.priority
+ end.max || 0
end
end
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class AddLockIndexToContainers < ActiveRecord::Migration
+ def change
+ # For the current code in sdk/go/dispatch:
+ add_index :containers, [:locked_by_uuid, :priority]
+ # For future dispatchers that use filters instead of offset for
+ # more predictable paging:
+ add_index :containers, [:locked_by_uuid, :uuid]
+ end
+end
--- /dev/null
+class AddMd5IndexToContainers < ActiveRecord::Migration
+ def up
+ ActiveRecord::Base.connection.execute 'CREATE INDEX index_containers_on_reuse_columns on containers (md5(command), cwd, md5(environment), output_path, container_image, md5(mounts), secret_mounts_md5, md5(runtime_constraints))'
+ end
+ def down
+ ActiveRecord::Base.connection.execute 'DROP INDEX index_containers_on_reuse_columns'
+ end
+end
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class AddQueueIndexToContainers < ActiveRecord::Migration
+ def up
+ ActiveRecord::Base.connection.execute 'CREATE INDEX index_containers_on_queued_state on containers (state, (priority > 0))'
+ end
+ def down
+ ActiveRecord::Base.connection.execute 'DROP INDEX index_containers_on_queued_state'
+ end
+end
CREATE INDEX index_containers_on_auth_uuid ON public.containers USING btree (auth_uuid);
+--
+-- Name: index_containers_on_locked_by_uuid_and_priority; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_containers_on_locked_by_uuid_and_priority ON public.containers USING btree (locked_by_uuid, priority);
+
+
+--
+-- Name: index_containers_on_locked_by_uuid_and_uuid; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_containers_on_locked_by_uuid_and_uuid ON public.containers USING btree (locked_by_uuid, uuid);
+
+
--
-- Name: index_containers_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
--
CREATE INDEX index_containers_on_owner_uuid ON public.containers USING btree (owner_uuid);
+--
+-- Name: index_containers_on_queued_state; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_containers_on_queued_state ON public.containers USING btree (state, ((priority > 0)));
+
+
+--
+-- Name: index_containers_on_reuse_columns; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_containers_on_reuse_columns ON public.containers USING btree (md5(command), cwd, md5(environment), output_path, container_image, md5(mounts), secret_mounts_md5, md5(runtime_constraints));
+
+
--
-- Name: index_containers_on_secret_mounts_md5; Type: INDEX; Schema: public; Owner: -
--
INSERT INTO schema_migrations (version) VALUES ('20180820130357');
+INSERT INTO schema_migrations (version) VALUES ('20180820132617');
+
INSERT INTO schema_migrations (version) VALUES ('20180820135808');
+INSERT INTO schema_migrations (version) VALUES ('20180824152014');
+
+INSERT INTO schema_migrations (version) VALUES ('20180824155207');
+
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
name: running
state: Committed
- priority: 1
+ priority: 501
created_at: <%= 2.minute.ago.to_s(:db) %>
updated_at: <%= 1.minute.ago.to_s(:db) %>
modified_at: <%= 1.minute.ago.to_s(:db) %>
end
end
+ test "list trashed collections and projects" do
+ authorize_with :active
+ get(:contents, {
+ format: :json,
+ include_trash: true,
+ filters: [
+ ['uuid', 'is_a', ['arvados#collection', 'arvados#group']],
+ ['is_trashed', '=', true],
+ ],
+ limit: 10000,
+ })
+ assert_response :success
+ found_uuids = json_response['items'].collect { |i| i['uuid'] }
+ assert_includes found_uuids, groups(:trashed_project).uuid
+ refute_includes found_uuids, groups(:aproject).uuid
+ assert_includes found_uuids, collections(:expired_collection).uuid
+ refute_includes found_uuids, collections(:w_a_z_file).uuid
+ end
+
test "list objects in home project" do
authorize_with :active
get :contents, {
assert actually_checked_anything, "Didn't even find two items to compare."
end
- test 'list objects across multiple projects' do
- authorize_with :project_viewer
- get :contents, {
- format: :json,
- filters: [['uuid', 'is_a', 'arvados#specimen']]
- }
- assert_response :success
- found_uuids = json_response['items'].collect { |i| i['uuid'] }
- [[:in_aproject, true],
- [:in_asubproject, true],
- [:owned_by_private_group, false]].each do |specimen_fixture, should_find|
- if should_find
- assert_includes found_uuids, specimens(specimen_fixture).uuid, "did not find specimen fixture '#{specimen_fixture}'"
- else
- refute_includes found_uuids, specimens(specimen_fixture).uuid, "found specimen fixture '#{specimen_fixture}'"
- end
- end
- end
-
# Even though the project_viewer tests go through other controllers,
# I'm putting them here so they're easy to find alongside the other
# project tests.
end
[
- ['running_container_auth', 'zzzzz-dz642-runningcontainr', 1],
+ ['running_container_auth', 'zzzzz-dz642-runningcontainr', 501],
['active_no_prefs', nil, 0],
].each do |token, expected, expected_priority|
test "create as #{token} and expect requesting_container_uuid to be #{expected}" do
"context"
"flag"
"fmt"
- "log"
"os"
"os/exec"
"os/signal"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "github.com/Sirupsen/logrus"
)
var version = "dev"
func main() {
err := doMain()
if err != nil {
- log.Fatalf("%q", err)
+ logrus.Fatalf("%q", err)
}
}
)
func doMain() error {
+ logger := logrus.StandardLogger()
+ if os.Getenv("DEBUG") != "" {
+ logger.SetLevel(logrus.DebugLevel)
+ }
+ logger.Formatter = &logrus.JSONFormatter{
+ TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+ }
+
flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
pollInterval := flags.Int(
return nil
}
- log.Printf("crunch-dispatch-local %s started", version)
+ logger.Printf("crunch-dispatch-local %s started", version)
runningCmds = make(map[string]*exec.Cmd)
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Printf("Error making Arvados client: %v", err)
+ logger.Errorf("error making Arvados client: %v", err)
return err
}
arv.Retries = 25
dispatcher := dispatch.Dispatcher{
+ Logger: logger,
Arv: arv,
RunContainer: run,
PollPeriod: time.Duration(*pollInterval) * time.Second,
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
sig := <-c
- log.Printf("Received %s, shutting down", sig)
+ logger.Printf("Received %s, shutting down", sig)
signal.Stop(c)
cancel()
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stderr
- log.Printf("Starting container %v", uuid)
+ dispatcher.Logger.Printf("starting container %v", uuid)
// Add this crunch job to the list of runningCmds only if we
// succeed in starting crunch-run.
runningCmdsMutex.Lock()
if err := startCmd(container, cmd); err != nil {
runningCmdsMutex.Unlock()
- log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
+ dispatcher.Logger.Warnf("error starting %q for %s: %s", *crunchRunCommand, uuid, err)
dispatcher.UpdateState(uuid, dispatch.Cancelled)
} else {
runningCmds[uuid] = cmd
go func() {
if _, err := cmd.Process.Wait(); err != nil {
- log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+ dispatcher.Logger.Warnf("error while waiting for crunch job to finish for %v: %q", uuid, err)
}
- log.Printf("sending done")
+ dispatcher.Logger.Debugf("sending done")
done <- struct{}{}
}()
case c := <-status:
// Interrupt the child process if priority changes to 0
if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
- log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+ dispatcher.Logger.Printf("sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
cmd.Process.Signal(os.Interrupt)
}
}
}
close(done)
- log.Printf("Finished container run for %v", uuid)
+ dispatcher.Logger.Printf("finished container run for %v", uuid)
// Remove the crunch job from runningCmds
runningCmdsMutex.Lock()
// If the container is not finalized, then change it to "Cancelled".
err := dispatcher.Arv.Get("containers", uuid, nil, &container)
if err != nil {
- log.Printf("Error getting final container state: %v", err)
+ dispatcher.Logger.Warnf("error getting final container state: %v", err)
}
if container.State == dispatch.Locked || container.State == dispatch.Running {
- log.Printf("After %s process termination, container state for %v is %q. Updating it to %q",
- *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
+ dispatcher.Logger.Warnf("after %q process termination, container state for %v is %q; updating it to %q",
+ *crunchRunCommand, uuid, container.State, dispatch.Cancelled)
dispatcher.UpdateState(uuid, dispatch.Cancelled)
}
for range status {
}
- log.Printf("Finalized container %v", uuid)
+ dispatcher.Logger.Printf("finalized container %v", uuid)
}
"bytes"
"context"
"io"
- "log"
"net/http"
"net/http/httptest"
"os"
"os/exec"
- "strings"
+ "regexp"
"testing"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "github.com/Sirupsen/logrus"
. "gopkg.in/check.v1"
)
initialArgs = os.Args
arvadostest.StartAPI()
runningCmds = make(map[string]*exec.Cmd)
+ logrus.SetFormatter(&logrus.TextFormatter{DisableColors: true})
}
func (s *TestSuite) TearDownSuite(c *C) {
arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1, "locked_by_uuid": "` + arvadostest.Dispatch1AuthUUID + `"}`)}
testWithServerStub(c, apiStubResponses, "echo",
- `After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2". Updating it to "Cancelled"`)
+ `after \\"echo\\" process termination, container state for zzzzz-dz642-xxxxxxxxxxxxxx2 is \\"Running\\"; updating it to \\"Cancelled\\"`)
}
func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3/lock"] =
arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Locked", "priority":1}`)}
- testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting nosuchcommand for zzzzz-dz642-xxxxxxxxxxxxxx3")
+ testWithServerStub(c, apiStubResponses, "nosuchcommand", `error starting \\"nosuchcommand\\" for zzzzz-dz642-xxxxxxxxxxxxxx3`)
}
func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
}
buf := bytes.NewBuffer(nil)
- log.SetOutput(io.MultiWriter(buf, os.Stderr))
- defer log.SetOutput(os.Stderr)
+ logrus.SetOutput(io.MultiWriter(buf, os.Stderr))
+ defer logrus.SetOutput(os.Stderr)
*crunchRunCommand = crunchCmd
ctx, cancel := context.WithCancel(context.Background())
dispatcher := dispatch.Dispatcher{
Arv: arv,
- PollPeriod: time.Duration(1) * time.Second,
+ PollPeriod: time.Second / 20,
RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
run(d, c, s)
cancel()
return cmd.Start()
}
+ re := regexp.MustCompile(`(?ms).*` + expected + `.*`)
go func() {
- for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+ for i := 0; i < 80 && !re.MatchString(buf.String()); i++ {
time.Sleep(100 * time.Millisecond)
}
cancel()
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "github.com/Sirupsen/logrus"
"github.com/coreos/go-systemd/daemon"
)
+type logger interface {
+ dispatch.Logger
+ Fatalf(string, ...interface{})
+}
+
const initialNiceValue int64 = 10000
var (
type Dispatcher struct {
*dispatch.Dispatcher
+ logger logrus.FieldLogger
cluster *arvados.Cluster
sqCheck *SqueueChecker
slurm Slurm
}
func main() {
- disp := &Dispatcher{}
+ logger := logrus.StandardLogger()
+ if os.Getenv("DEBUG") != "" {
+ logger.SetLevel(logrus.DebugLevel)
+ }
+ logger.Formatter = &logrus.JSONFormatter{
+ TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+ }
+ disp := &Dispatcher{logger: logger}
err := disp.Run(os.Args[0], os.Args[1:])
if err != nil {
- log.Fatal(err)
+ logrus.Fatalf("%s", err)
}
}
return nil
}
- log.Printf("crunch-dispatch-slurm %s started", version)
+ disp.logger.Printf("crunch-dispatch-slurm %s started", version)
err := disp.readConfig(*configPath)
if err != nil {
os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
} else {
- log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
+ disp.logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
}
if *dumpConfig {
siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
if os.IsNotExist(err) {
- log.Printf("warning: no cluster config (%s), proceeding with no node types defined", err)
+ disp.logger.Warnf("no cluster config (%s), proceeding with no node types defined", err)
} else if err != nil {
return fmt.Errorf("error loading config: %s", err)
} else if disp.cluster, err = siteConfig.GetCluster(""); err != nil {
// setup() initializes private fields after configure().
func (disp *Dispatcher) setup() {
+ if disp.logger == nil {
+ disp.logger = logrus.StandardLogger()
+ }
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Fatalf("Error making Arvados client: %v", err)
+ disp.logger.Fatalf("Error making Arvados client: %v", err)
}
arv.Retries = 25
disp.slurm = NewSlurmCLI()
disp.sqCheck = &SqueueChecker{
+ Logger: disp.logger,
Period: time.Duration(disp.PollPeriod),
PrioritySpread: disp.PrioritySpread,
Slurm: disp.slurm,
}
disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
+ Logger: disp.logger,
BatchSize: disp.BatchSize,
RunContainer: disp.runContainer,
PollPeriod: time.Duration(disp.PollPeriod),
case <-ctx.Done():
// Disappeared from squeue
if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
- log.Printf("Error getting final container state for %s: %s", ctr.UUID, err)
+ log.Printf("error getting final container state for %s: %s", ctr.UUID, err)
}
switch ctr.State {
case dispatch.Running:
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"net/http/httptest"
"os"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "github.com/Sirupsen/logrus"
. "gopkg.in/check.v1"
)
}
s.disp.slurm = &s.slurm
- s.disp.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: s.disp.slurm}
+ s.disp.sqCheck = &SqueueChecker{
+ Logger: logrus.StandardLogger(),
+ Period: 500 * time.Millisecond,
+ Slurm: s.disp.slurm,
+ }
err = s.disp.Dispatcher.Run(ctx)
<-doneRun
}
buf := bytes.NewBuffer(nil)
- log.SetOutput(io.MultiWriter(buf, os.Stderr))
- defer log.SetOutput(os.Stderr)
+ logrus.SetOutput(io.MultiWriter(buf, os.Stderr))
+ defer logrus.SetOutput(os.Stderr)
s.disp.CrunchRunCommand = []string{crunchCmd}
import (
"bytes"
"fmt"
- "log"
"sort"
"strings"
"sync"
// Squeue implements asynchronous polling monitor of the SLURM queue using the
// command 'squeue'.
type SqueueChecker struct {
+ Logger logger
Period time.Duration
PrioritySpread int64
Slurm Slurm
}
err := sqc.Slurm.Renice(job.uuid, niceNew)
if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
- log.Printf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
+ sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
job.hitNiceLimit = true
}
}
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
- log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
+ sqc.Logger.Warnf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
return
}
var uuid, state, reason string
var n, p int64
if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
- log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+ sqc.Logger.Warnf("ignoring unparsed line in squeue output: %q", line)
continue
}
// "launch failed requeued held" seems to be
// another manifestation of this problem,
// resolved the same way.
- log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
+ sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
sqc.Slurm.Release(uuid)
} else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
- log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
+ sqc.Logger.Warnf("job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
}
}
sqc.lock.Lock()
import (
"time"
+ "github.com/Sirupsen/logrus"
. "gopkg.in/check.v1"
)
queue: uuids[0] + " 10000 4294000000 PENDING Resources\n" + uuids[1] + " 10000 4294000111 PENDING Resources\n" + uuids[2] + " 10000 0 PENDING BadConstraints\n",
}
sqc := &SqueueChecker{
+ Logger: logrus.StandardLogger(),
Slurm: slurm,
Period: time.Hour,
}
queue: test.squeue,
}
sqc := &SqueueChecker{
+ Logger: logrus.StandardLogger(),
Slurm: slurm,
PrioritySpread: test.spread,
Period: time.Hour,
rejectNice10K: true,
}
sqc := &SqueueChecker{
+ Logger: logrus.StandardLogger(),
Slurm: slurm,
PrioritySpread: 1,
Period: time.Hour,
slurm := &slurmFake{}
sqc := &SqueueChecker{
+ Logger: logrus.StandardLogger(),
Slurm: slurm,
Period: time.Hour,
}
}
client := s3.New(auth, region)
+ if region.EC2Endpoint.Signer == aws.V4Signature {
+ // Currently affects only eu-central-1
+ client.Signature = aws.V4Signature
+ }
client.ConnectTimeout = time.Duration(v.ConnectTimeout)
client.ReadTimeout = time.Duration(v.ReadTimeout)
v.bucket = &s3bucket{
s.files = ["bin/arvados-login-sync", "agpl-3.0.txt"]
s.executables << "arvados-login-sync"
s.required_ruby_version = '>= 2.1.0'
- s.add_runtime_dependency 'arvados', '~> 1.1.0', '>= 1.1.4'
+ s.add_runtime_dependency 'arvados', '~> 1.2.0', '>= 1.2.0'
s.homepage =
'https://arvados.org'
end