Merge branch '13330-intermediates-test' of git.curoverse.com:arvados into 13330-cwl...
authorFuad Muhic <fmuhic@capeannenterprises.com>
Mon, 18 Jun 2018 12:20:21 +0000 (14:20 +0200)
committerFuad Muhic <fmuhic@capeannenterprises.com>
Mon, 18 Jun 2018 12:20:21 +0000 (14:20 +0200)
Arvados-DCO-1.1-Signed-off-by: Fuad Muhic <fmuhic@capeannenterprises.com>

177 files changed:
.licenseignore
AUTHORS
apps/workbench/app/views/tests/mithril.html
apps/workbench/app/views/users/link_account.html.erb
apps/workbench/test/integration_helper.rb
build/build.list
build/check-copyright-notices
build/libcloud-pin.sh
build/run-build-packages.sh
build/run-tests.sh
doc/api/methods.html.textile.liquid
doc/api/methods/collections.html.textile.liquid
doc/api/methods/container_requests.html.textile.liquid
doc/api/methods/groups.html.textile.liquid
doc/api/methods/links.html.textile.liquid
doc/api/methods/nodes.html.textile.liquid
doc/css/images.css
lib/dispatchcloud/node_size.go
lib/dispatchcloud/node_size_test.go
sdk/R/R/Arvados.R
sdk/R/R/ArvadosFile.R
sdk/R/R/Collection.R
sdk/R/R/CollectionTree.R
sdk/R/R/HttpParser.R
sdk/R/R/HttpRequest.R
sdk/R/R/RESTService.R
sdk/R/R/Subcollection.R
sdk/R/R/autoGenAPI.R
sdk/R/R/util.R
sdk/R/README.Rmd
sdk/R/createDoc.R
sdk/R/install_deps.R
sdk/R/run_test.R
sdk/R/tests/testthat.R
sdk/R/tests/testthat/fakes/FakeArvados.R
sdk/R/tests/testthat/fakes/FakeHttpParser.R
sdk/R/tests/testthat/fakes/FakeHttpRequest.R
sdk/R/tests/testthat/fakes/FakeRESTService.R
sdk/R/tests/testthat/test-ArvadosFile.R
sdk/R/tests/testthat/test-Collection.R
sdk/R/tests/testthat/test-CollectionTree.R
sdk/R/tests/testthat/test-HttpParser.R
sdk/R/tests/testthat/test-HttpRequest.R
sdk/R/tests/testthat/test-RESTService.R
sdk/R/tests/testthat/test-Subcollection.R
sdk/R/tests/testthat/test-util.R
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/arvados_cwl/http.py [new file with mode: 0644]
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/arvados_version.py
sdk/cwl/setup.py
sdk/cwl/tests/12213-keepref-expr.cwl
sdk/cwl/tests/12213-keepref-job.yml
sdk/cwl/tests/12213-keepref-tool.cwl
sdk/cwl/tests/12213-keepref-wf.cwl
sdk/cwl/tests/12418-glob-empty-collection.cwl
sdk/cwl/tests/secondary/ls.cwl
sdk/cwl/tests/secondary/sub.cwl
sdk/cwl/tests/secondary/wf-job.yml
sdk/cwl/tests/secondary/wf.cwl
sdk/cwl/tests/secondaryFiles/example1.cwl
sdk/cwl/tests/secondaryFiles/example3.cwl
sdk/cwl/tests/secondaryFiles/inp3.yml
sdk/cwl/tests/secret_test_job.yml
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_http.py [new file with mode: 0644]
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_pathmapper.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/wf-defaults/default-dir1.cwl
sdk/cwl/tests/wf-defaults/default-dir2.cwl
sdk/cwl/tests/wf-defaults/default-dir3.cwl
sdk/cwl/tests/wf-defaults/default-dir4.cwl
sdk/cwl/tests/wf-defaults/default-dir5.cwl
sdk/cwl/tests/wf-defaults/default-dir6.cwl
sdk/cwl/tests/wf-defaults/default-dir6a.cwl
sdk/cwl/tests/wf-defaults/default-dir7.cwl
sdk/cwl/tests/wf-defaults/default-dir7a.cwl
sdk/cwl/tests/wf-defaults/wf1.cwl
sdk/cwl/tests/wf-defaults/wf2.cwl
sdk/cwl/tests/wf-defaults/wf3.cwl
sdk/cwl/tests/wf-defaults/wf4.cwl
sdk/cwl/tests/wf-defaults/wf5.cwl
sdk/cwl/tests/wf-defaults/wf6.cwl
sdk/cwl/tests/wf-defaults/wf7.cwl
sdk/cwl/tests/wf/check_mem.py
sdk/cwl/tests/wf/echo-subwf.cwl
sdk/cwl/tests/wf/echo-wf.cwl
sdk/cwl/tests/wf/echo_a.cwl
sdk/cwl/tests/wf/echo_b.cwl
sdk/cwl/tests/wf/secret_job.cwl
sdk/cwl/tests/wf/secret_wf.cwl
sdk/go/arvados/config.go
sdk/go/arvados/container.go
sdk/go/arvados/fs_filehandle.go
sdk/go/health/aggregator.go
sdk/go/health/aggregator_test.go
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/support.go
sdk/python/arvados/collection.py
sdk/python/arvados/commands/put.py
sdk/python/arvados/commands/run.py
sdk/python/setup.py
sdk/python/tests/test_arv_put.py
sdk/python/tests/test_collections.py
services/api/app/controllers/arvados/v1/containers_controller.rb
services/api/app/models/container.rb
services/api/app/models/container_request.rb
services/api/app/models/group.rb
services/api/config/application.default.yml
services/api/db/migrate/20170704160233_yaml_to_json.rb
services/api/db/migrate/20170706141334_json_collection_properties.rb
services/api/db/migrate/20171027183824_add_index_to_containers.rb
services/api/db/migrate/20171208203841_fix_trash_flag_follow.rb
services/api/db/migrate/20171212153352_add_gin_index_to_collection_properties.rb
services/api/db/migrate/20180228220311_add_secret_mounts_to_containers.rb
services/api/db/migrate/20180313180114_change_container_priority_bigint.rb
services/api/db/migrate/20180501182859_add_redirect_to_user_uuid_to_users.rb
services/api/db/migrate/20180514135529_add_container_auth_uuid_index.rb
services/api/db/migrate/20180607175050_properties_to_jsonb.rb [new file with mode: 0644]
services/api/db/migrate/20180608123145_add_properties_to_groups.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/lib/update_priority.rb [new file with mode: 0644]
services/api/test/unit/arvados_model_test.rb
services/api/test/unit/container_request_test.rb
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-run/git_mount.go
services/fuse/setup.py
services/health/main.go
services/keep-balance/balance.go
services/keep-balance/balance_test.go
services/keep-balance/main.go
services/keep-balance/usage.go
services/keep-web/webdav_test.go
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go
services/keepstore/s3_volume.go
services/nodemanager/arvnodeman/computenode/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/driver/__init__.py
services/nodemanager/arvnodeman/computenode/driver/azure.py
services/nodemanager/arvnodeman/computenode/driver/ec2.py
services/nodemanager/arvnodeman/computenode/driver/gce.py
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/arvnodeman/nodelist.py
services/nodemanager/arvnodeman/test/fake_driver.py
services/nodemanager/setup.py
services/nodemanager/tests/fake_azure.cfg.template
services/nodemanager/tests/fake_ec2.cfg.template
services/nodemanager/tests/fake_gce.cfg.template
services/nodemanager/tests/integration_test.py
services/nodemanager/tests/test_computenode.py
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_driver.py
services/nodemanager/tests/test_computenode_driver_azure.py
services/nodemanager/tests/test_computenode_driver_ec2.py
services/nodemanager/tests/test_computenode_driver_gce.py
services/nodemanager/tests/test_config.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_failure.py
services/nodemanager/tests/test_nodelist.py
services/nodemanager/tests/testutil.py
tools/arvbox/lib/arvbox/docker/runit/1
tools/arvbox/lib/arvbox/docker/runit/2
tools/arvbox/lib/arvbox/docker/runit/3
tools/arvbox/lib/arvbox/docker/runit/ctrlaltdel
tools/crunchstat-summary/crunchstat_summary/synchronizer.js

index 51980b16c2ffcbdd2ab93729c4677048b5160c39..51a1e7cbd2f0aabca972527475630923f1f1ef75 100644 (file)
@@ -26,6 +26,8 @@ docker/jobs/apt.arvados.org.list
 *.gz.report
 *.ico
 *.jpg
+*.svg
+*.odg
 *.json
 *LICENSE*.html
 .licenseignore
@@ -59,3 +61,11 @@ services/arv-web/sample-cgi-app/public/index.cgi
 services/keepproxy/pkg-extras/etc/default/keepproxy
 *.tar
 tools/crunchstat-summary/tests/crunchstat_error_messages.txt
+tools/crunchstat-summary/crunchstat_summary/synchronizer.js
+build/package-build-dockerfiles/debian9/D39DC0E3.asc
+build/package-test-dockerfiles/debian9/D39DC0E3.asc
+sdk/R/DESCRIPTION
+sdk/R/NAMESPACE
+sdk/R/.Rbuildignore
+sdk/R/ArvadosR.Rproj
+*.Rd
diff --git a/AUTHORS b/AUTHORS
index ea9fa4c7092e8c2069a2105d8eafb25e6107d3ab..9a861a6315099a8faec86b854d8737078adc22b7 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
@@ -17,3 +17,4 @@ Joshua Randall <joshua.randall@sanger.ac.uk>
 President and Fellows of Harvard College <*@harvard.edu>
 Thomas Mooney <tmooney@genome.wustl.edu>
 Chen Chen <aflyhorse@gmail.com>
+Veritas Genetics, Inc. <*@veritasgenetics.com>
index a629eb75fda0a8140e00d411543528e05765d4ae..fac2d88c50586e844754a2016bfd9d04dfde72b5 100644 (file)
@@ -1 +1,5 @@
+<!-- Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: AGPL-3.0 -->
+
 <div data-mount-mithril="TestComponent"></div>
index 61063b8edaaeecfe7f333ff7ed17da3645486c36..86a0446e76e07603b079ac50465a63a3885c81cb 100644 (file)
@@ -1,3 +1,7 @@
+<%# Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: AGPL-3.0 %>
+
 <%= javascript_tag do %>
   function update_visibility() {
     if (sessionStorage.getItem('link_account_api_token') &&
index ef2779cc3e78eedb556ce2dc7114a6e2466112b0..33e50087e77d127e9c30991860b92315aade3d33 100644 (file)
@@ -221,6 +221,8 @@ class ActionDispatch::IntegrationTest
     end
     if Capybara.current_driver == :selenium
       page.execute_script("window.localStorage.clear()")
+    else
+      page.driver.restart if defined?(page.driver.restart)
     end
     Capybara.reset_sessions!
   end
index e994a2d669eeaed62438f57cfd68bcbb4458ee93..fa1a260c3a225fcbf29bfccb17e364b1f7277000 100644 (file)
@@ -5,7 +5,6 @@
 #distribution(s)|name|version|iteration|type|architecture|extra fpm arguments
 debian8,debian9,centos7|python-gflags|2.0|2|python|all
 debian8,debian9,ubuntu1404,ubuntu1604,centos7|google-api-python-client|1.6.2|2|python|all
-debian8,debian9,ubuntu1404,ubuntu1604,centos7|apache-libcloud|2.3.0|3|python|all|--depends 'python-requests >= 2.4.3'
 debian8,debian9,ubuntu1404,centos7|oauth2client|1.5.2|2|python|all
 debian8,debian9,ubuntu1404,centos7|pyasn1|0.1.7|2|python|all
 debian8,debian9,ubuntu1404,centos7|pyasn1-modules|0.0.5|2|python|all
@@ -42,9 +41,9 @@ centos7|pbr|0.11.1|2|python|all
 centos7|pyparsing|2.1.10|2|python|all
 centos7|keepalive|0.5|2|python|all
 debian8,debian9,ubuntu1404,ubuntu1604,centos7|lockfile|0.12.2|2|python|all|--epoch 1
-debian8,debian9,ubuntu1404,ubuntu1604,centos7|subprocess32|3.5.0rc1|2|python|all
+debian8,debian9,ubuntu1404,ubuntu1604,centos7|subprocess32|3.5.1|2|python|all
 all|ruamel.yaml|0.14.12|2|python|amd64|--python-setup-py-arguments --single-version-externally-managed
-all|cwltest|1.0.20180416154033|4|python|all|--depends 'python-futures >= 3.0.5' --depends 'python-subprocess32'
+all|cwltest|1.0.20180518074130|4|python|all|--depends 'python-futures >= 3.0.5' --depends 'python-subprocess32 >= 3.5.0'
 all|junit-xml|1.8|3|python|all
 all|rdflib-jsonld|0.4.0|2|python|all
 all|futures|3.0.5|2|python|all
index f087188991c5c06a3b19a3b1e38325d9d29e5c52..2a40b50ec1f5b94c2523e293871d04005d962973 100755 (executable)
@@ -96,7 +96,7 @@ do
             | */nodemanager/doc/*.cfg \
             | */nodemanager/tests/fake*.cfg.template \
             | */nginx.conf \
-            | build/build.list)
+            | build/build.list | *.R)
             fixer=fixer
             cc="#"
             ;;
@@ -175,7 +175,7 @@ ${cc}${cc:+ }SPDX-License-Identifier: CC-BY-SA-3.0${ce}"
     wantBYSAmd="[comment]: # (Copyright Â© The Arvados Authors. All rights reserved.)
 [comment]: # ()
 [comment]: # (SPDX-License-Identifier: CC-BY-SA-3.0)"
-    found=$(head -n20 "$fnm" | egrep -A${grepAfter} -B${grepBefore} 'Copyright.*Arvados' || true)
+    found=$(head -n20 "$fnm" | egrep -A${grepAfter} -B${grepBefore} 'Copyright.*All rights reserved.' || true)
     case ${fnm} in
         Makefile | build/* | lib/* | tools/* | apps/* | services/* | sdk/cli/bin/crunch-job)
             want=${wantGPL}
index cfbba404504e3b7c60d553040fb64c97e3698f77..bb66c6b218c020c5d038c1e5e7b51f8681043db9 100644 (file)
@@ -2,9 +2,9 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-LIBCLOUD_PIN=2.3.0
+LIBCLOUD_PIN=2.3.1.dev1
 
-using_fork=false
+using_fork=true
 if [[ $using_fork = true ]]; then
     LIBCLOUD_PIN_SRC="https://github.com/curoverse/libcloud/archive/apache-libcloud-$LIBCLOUD_PIN.zip"
 else
index fb37d53774982f7704c44ad71ce5de329b2fc64c..351d1b2a1f3df666c7fc90df6f456c10522c2dcf 100755 (executable)
@@ -352,7 +352,7 @@ else
 fi
 test_package_presence ${PYTHON2_PKG_PREFIX}-arvados-cwl-runner "$arvados_cwl_runner_version" python "$arvados_cwl_runner_iteration"
 if [[ "$?" == "0" ]]; then
-  fpm_build $WORKSPACE/sdk/cwl "${PYTHON2_PKG_PREFIX}-arvados-cwl-runner" 'Curoverse, Inc.' 'python' "$arvados_cwl_runner_version" "--url=https://arvados.org" "--description=The Arvados CWL runner" --depends "${PYTHON2_PKG_PREFIX}-setuptools" --depends "${PYTHON2_PKG_PREFIX}-subprocess32 >= 3.5.0rc1" --depends "${PYTHON2_PKG_PREFIX}-pathlib2" --depends "${PYTHON2_PKG_PREFIX}-scandir" "${iterargs[@]}"
+  fpm_build $WORKSPACE/sdk/cwl "${PYTHON2_PKG_PREFIX}-arvados-cwl-runner" 'Curoverse, Inc.' 'python' "$arvados_cwl_runner_version" "--url=https://arvados.org" "--description=The Arvados CWL runner" --depends "${PYTHON2_PKG_PREFIX}-setuptools" --depends "${PYTHON2_PKG_PREFIX}-subprocess32 >= 3.5.0" --depends "${PYTHON2_PKG_PREFIX}-pathlib2" --depends "${PYTHON2_PKG_PREFIX}-scandir" "${iterargs[@]}"
 fi
 
 # schema_salad. This is a python dependency of arvados-cwl-runner,
@@ -434,8 +434,30 @@ if [[ "$?" == "0" ]]; then
   fpm_build $WORKSPACE/tools/crunchstat-summary ${PYTHON2_PKG_PREFIX}-crunchstat-summary 'Curoverse, Inc.' 'python' "$crunchstat_summary_version" "--url=https://arvados.org" "--description=Crunchstat-summary reads Arvados Crunch log files and summarize resource usage" --iteration "$iteration"
 fi
 
-## if libcloud becomes our own fork see
-## https://dev.arvados.org/issues/12268#note-27
+# Forked libcloud
+if test_package_presence "$PYTHON2_PKG_PREFIX"-apache-libcloud "$LIBCLOUD_PIN" python 2
+then
+  LIBCLOUD_DIR=$(mktemp -d)
+  (
+      cd $LIBCLOUD_DIR
+      git clone $DASHQ_UNLESS_DEBUG https://github.com/curoverse/libcloud.git .
+      git checkout $DASHQ_UNLESS_DEBUG apache-libcloud-$LIBCLOUD_PIN
+      # libcloud is absurdly noisy without -q, so force -q here
+      OLD_DASHQ_UNLESS_DEBUG=$DASHQ_UNLESS_DEBUG
+      DASHQ_UNLESS_DEBUG=-q
+      handle_python_package
+      DASHQ_UNLESS_DEBUG=$OLD_DASHQ_UNLESS_DEBUG
+  )
+
+  # libcloud >= 2.3.0 now requires python-requests 2.4.3 or higher, otherwise
+  # it throws
+  #   ImportError: No module named packages.urllib3.poolmanager
+  # when loaded. We only see this problem on ubuntu1404, because that is our
+  # only supported distribution that ships with a python-requests older than
+  # 2.4.3.
+  fpm_build $LIBCLOUD_DIR "$PYTHON2_PKG_PREFIX"-apache-libcloud "" python "" --iteration 2 --depends 'python-requests >= 2.4.3'
+  rm -rf $LIBCLOUD_DIR
+fi
 
 # Python 2 dependencies
 declare -a PIP_DOWNLOAD_SWITCHES=(--no-deps)
index 8a8f5b6d240ad29fd729b5936e5befb8ffbe50fa..7d3646c9fbf2dfc0c4c4d21cf23e8c0c4dc5348b 100755 (executable)
@@ -270,6 +270,8 @@ declare -a failures
 declare -A skip
 declare -A testargs
 skip[apps/workbench_profile]=1
+# nodemanager_integration tests are not reliable, see #12061.
+skip[services/nodemanager_integration]=1
 
 while [[ -n "$1" ]]
 do
@@ -512,13 +514,20 @@ export GOPATH
     set -e
     mkdir -p "$GOPATH/src/git.curoverse.com"
     rmdir -v --parents --ignore-fail-on-non-empty "${temp}/GOPATH"
+    if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
+        for d in \
+            "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
+                "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
+                "$GOPATH/src/git.curoverse.com/arvados.git"; do
+            [[ -d "$d" ]] && rmdir "$d"
+        done
+    fi
     for d in \
-        "$GOPATH/src/git.curoverse.com/arvados.git/arvados.git" \
-            "$GOPATH/src/git.curoverse.com/arvados.git"; do
-        [[ -d "$d" ]] && rmdir "$d"
+        "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
+        "$GOPATH/src/git.curoverse.com/arvados.git"; do
         [[ -h "$d" ]] && rm "$d"
     done
-    ln -vsnfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+    ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
     go get -v github.com/kardianos/govendor
     cd "$GOPATH/src/git.curoverse.com/arvados.git"
     if [[ -n "$short" ]]; then
index 00c120d9f8f1be4aad90022b514fe37024618dc3..937ae706d66295055ffbca485c1b587bc5c40739 100644 (file)
@@ -98,7 +98,7 @@ table(table table-bordered table-condensed).
 |@is_a@|string|Arvados object type|@["head_uuid","is_a","arvados#collection"]@|
 |@exists@|string|Test if a subproperty is present.|@["properties","exists","my_subproperty"]@|
 
-h4. Filtering on subproperties
+h4(#subpropertyfilters). Filtering on subproperties
 
 Some record type have an additional @properties@ attribute that allows recording and filtering on additional key-value pairs.  To filter on a subproperty, the value in the @attribute@ position has the form @properties.user_property@.  You may also use JSON-LD / RDF style URIs for property keys by enclosing them in @<...>@ for example @properties.<http://example.com/user_property>@.  Alternately you may also provide a JSON-LD "@context" field, however at this time JSON-LD contexts are not interpreted by Arvados.
 
index d753f0990f71facaa7580ac1a3bee8d1f69829a5..f761c665e57ad811085098c3145ec34ff0fd642b 100644 (file)
@@ -27,6 +27,7 @@ table(table table-bordered table-condensed).
 |_. Attribute|_. Type|_. Description|_. Example|
 |name|string|||
 |description|text|||
+|properties|hash|User-defined metadata, may be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters ||
 |portable_data_hash|string|The MD5 sum of the manifest text stripped of block hints other than the size hint.||
 |manifest_text|text|||
 |replication_desired|number|Minimum storage replication level desired for each data block referenced by this collection. A value of @null@ signifies that the site default replication level (typically 2) is desired.|@2@|
index 1c2550f723f5d8d96241ff12b9d5c09cf136e512..0e2e8ce7c6135490e61585594471080ce1ae1719 100644 (file)
@@ -29,7 +29,7 @@ table(table table-bordered table-condensed).
 |_. Attribute|_. Type|_. Description|_. Notes|
 |name|string|The name of the container_request.||
 |description|string|The description of the container_request.||
-|properties|hash|Client-defined structured data that does not affect how the container is run.||
+|properties|hash|User-defined metadata that does not affect how the container is run.  May be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters||
 |state|string|The allowed states are "Uncommitted", "Committed", and "Final".|Once a request is Committed, the only attributes that can be modified are priority, container_uuid, and container_count_max. A request in the "Final" state cannot have any of its functional parts modified (i.e., only name, description, and properties fields can be modified).|
 |requesting_container_uuid|string|The uuid of the parent container that created this container_request, if any. Represents a process tree.|The priority of this container_request is inherited from the parent container, if the parent container is cancelled, this container_request will be cancelled as well.|
 |container_uuid|string|The uuid of the container that satisfies this container_request. The system may return a preexisting Container that matches the container request criteria. See "Container reuse":#container_reuse for more details.|Container reuse is the default behavior, but may be disabled with @use_existing: false@ to always create a new container.|
index 2716056caac06ca0976ccd595b0cfa89ae17d438..d4ef5ebb78c434312c0b83a31585d07706781e84 100644 (file)
@@ -28,6 +28,7 @@ table(table table-bordered table-condensed).
 |group_class|string|Type of group. This does not affect behavior, but determines how the group is presented in the user interface. For example, @project@ indicates that the group should be displayed by Workbench and arv-mount as a project for organizing and naming objects.|@"project"@
 null|
 |description|text|||
+|properties|hash|User-defined metadata, may be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters ||
 |writable_by|array|List of UUID strings identifying Users and other Groups that have write permission for this Group.  Only users who are allowed to administer the Group will receive a full list.  Other users will receive a partial list that includes the Group's owner_uuid and (if applicable) their own user UUID.||
 |trash_at|datetime|If @trash_at@ is non-null and in the past, this group and all objects directly or indirectly owned by the group will be hidden from API calls.  May be untrashed.||
 |delete_at|datetime|If @delete_at@ is non-null and in the past, the group and all objects directly or indirectly owned by the group may be permanently deleted.||
index ec5d53010456bb36e239927bcdb563f1c6467e7e..04643443e680e4170df952aeb802f3dcf4eea9c7 100644 (file)
@@ -29,7 +29,7 @@ table(table table-bordered table-condensed).
 |tail_uuid|string|The origin or actor in the description or action (may be null).|
 |link_class|string|Type of link|
 |name|string|Primary value of the link.|
-|properties|hash|Additional information, expressed as a key&rarr;value hash. Key: string. Value: string, number, array, or hash.|
+|properties|hash|Additional information, expressed as a key&rarr;value hash. Key: string. Value: string, number, array, or hash.  May be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters|
 
 h2. Link classes
 
index 1b51f01c632ff1e4387d94355f5c6138ec01504f..7ddc62519c1922ad48a254827078f7b0651065ea 100644 (file)
@@ -32,7 +32,8 @@ table(table table-bordered table-condensed).
 |job_uuid|string|The UUID of the job that this node is assigned to work on.  If you do not have permission to read the job, this will be null.||
 |first_ping_at|datetime|||
 |last_ping_at|datetime|||
-|info|hash|||
+|info|hash|Sensitive information about the node (only visible to admin) such as 'ping_secret' and 'ec2_instance_id'. May be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters||
+|properties|hash|Public information about the node, such as 'total_cpu_cores', 'total_ram_mb', and 'total_scratch_mb'.  May be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters||
 
 h2. Methods
 
index f5245b38685812af85d9342d56fbbb03721f16a2..0bd2ec7f0c4a55ee8755643c4d4cc22a4a2935ec 100644 (file)
@@ -1,3 +1,7 @@
+/* Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0 */
+
 img.full-width {
     width: 100%
 }
index 2ca405060390c65df2f961f7c7a83e5a278d0687..b5fd0262a8e430ce9cdaac4c58c5078fe7e1e314 100644 (file)
@@ -8,6 +8,7 @@ import (
        "errors"
        "log"
        "os/exec"
+       "sort"
        "strings"
        "time"
 
@@ -15,11 +16,17 @@ import (
 )
 
 var (
-       ErrConstraintsNotSatisfiable  = errors.New("constraints not satisfiable by any configured instance type")
        ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
        discountConfiguredRAMPercent  = 5
 )
 
+// ConstraintsNotSatisfiableError includes a list of available instance types
+// to be reported back to the user.
+type ConstraintsNotSatisfiableError struct {
+       error
+       AvailableTypes []arvados.InstanceType
+}
+
 // ChooseInstanceType returns the cheapest available
 // arvados.InstanceType big enough to run ctr.
 func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
@@ -40,13 +47,22 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
        needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
        needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
 
-       err = ErrConstraintsNotSatisfiable
+       availableTypes := make([]arvados.InstanceType, len(cc.InstanceTypes))
+       copy(availableTypes, cc.InstanceTypes)
+       sort.Slice(availableTypes, func(a, b int) bool {
+               return availableTypes[a].Price < availableTypes[b].Price
+       })
+       err = ConstraintsNotSatisfiableError{
+               errors.New("constraints not satisfiable by any configured instance type"),
+               availableTypes,
+       }
        for _, it := range cc.InstanceTypes {
                switch {
                case err == nil && it.Price > best.Price:
                case it.Scratch < needScratch:
                case it.RAM < needRAM:
                case it.VCPUs < needVCPUs:
+               case it.Preemptable != ctr.SchedulingParameters.Preemptable:
                case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs):
                        // Equal price, but worse specs
                default:
index 0c02a0e3e1be45bfeb6b2371287a4ce664de1d98..d6b7c6bf9f49196e312d866e11efc5d9c6c83733 100644 (file)
@@ -32,7 +32,7 @@ func (*NodeSizeSuite) TestChooseUnsatisfiable(c *check.C) {
                        {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "small2"},
                        {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "small4", Scratch: GiB},
                }}, ctr)
-               c.Check(err, check.Equals, ErrConstraintsNotSatisfiable)
+               c.Check(err, check.FitsTypeOf, ConstraintsNotSatisfiableError{})
        }
 
        for _, rc := range []arvados.RuntimeConstraints{
@@ -91,3 +91,31 @@ func (*NodeSizeSuite) TestChoose(c *check.C) {
                c.Check(best.Scratch >= 2*GiB, check.Equals, true)
        }
 }
+
+func (*NodeSizeSuite) TestChoosePreemptable(c *check.C) {
+       menu := []arvados.InstanceType{
+               {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Preemptable: true, Name: "costly"},
+               {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "almost best"},
+               {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Preemptable: true, Name: "best"},
+               {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Preemptable: true, Name: "small"},
+       }
+       best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu}, &arvados.Container{
+               Mounts: map[string]arvados.Mount{
+                       "/tmp": {Kind: "tmp", Capacity: 2 * GiB},
+               },
+               RuntimeConstraints: arvados.RuntimeConstraints{
+                       VCPUs:        2,
+                       RAM:          987654321,
+                       KeepCacheRAM: 123456789,
+               },
+               SchedulingParameters: arvados.SchedulingParameters{
+                       Preemptable: true,
+               },
+       })
+       c.Check(err, check.IsNil)
+       c.Check(best.Name, check.Equals, "best")
+       c.Check(best.RAM >= 1234567890, check.Equals, true)
+       c.Check(best.VCPUs >= 2, check.Equals, true)
+       c.Check(best.Scratch >= 2*GiB, check.Equals, true)
+       c.Check(best.Preemptable, check.Equals, true)
+}
index 644725c2e2b42f05fca8d13c20e809f53d59ed5c..0ec2d115295749067ceb4ee105245aad73df149f 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 #' users.get
 #' 
 #' users.get is a method defined in Arvados class.
index e28ba9606cfebd95a89a436b7ac9953c98d63fb8..8f737831c4634cc09a3121a86e04dcbf0361946b 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 source("./R/util.R")
 
 #' ArvadosFile
index fad452ac7a05e97ab1ceabac0696028085426c9e..e23da138329786cba49e3a8001479461dd30be77 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 source("./R/Subcollection.R")
 source("./R/ArvadosFile.R")
 source("./R/RESTService.R")
index 91e4ec86459dc8e4ad8891d59cbdb80d771a4013..8686f88c1a8a3c55b695351b9993df55939d0f1a 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 source("./R/Subcollection.R")
 source("./R/ArvadosFile.R")
 source("./R/util.R")
index 5df8287fdce7b85f2b83003ac7e55720afc39645..8ce68f3837f158486534c6adc55e4ff23e9386e1 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 HttpParser <- R6::R6Class(
 
     "HttrParser",
index bc6b4d406d1f801acb7328a5d9263b44a473b2a7..95dd375debe5ce076638c55de49a57db1f2d8f0d 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 source("./R/util.R")
 
 HttpRequest <- R6::R6Class(
index dacf88a8c4bff2336e232fba4a86567a2cc5d7af..ac65d0df3f37b6baa6031bc8cbab71b163e27a76 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 RESTService <- R6::R6Class(
 
     "RESTService",
@@ -37,8 +41,8 @@ RESTService <- R6::R6Class(
 
                 headers <- list(Authorization = paste("OAuth2", self$token))
 
-                serverResponse <- self$http$execute("GET", discoveryDocumentURL, headers,
-                                                    retryTimes = self$numRetries)
+                serverResponse <- self$http$exec("GET", discoveryDocumentURL, headers,
+                                                 retryTimes = self$numRetries)
 
                 discoveryDocument <- self$httpParser$parseJSONResponse(serverResponse)
                 private$webDavHostName <- discoveryDocument$keepWebServiceUrl
@@ -64,8 +68,8 @@ RESTService <- R6::R6Class(
                               uuid, "/", relativePath);
             headers <- list(Authorization = paste("OAuth2", self$token)) 
 
-            serverResponse <- self$http$execute("DELETE", fileURL, headers,
-                                                retryTimes = self$numRetries)
+            serverResponse <- self$http$exec("DELETE", fileURL, headers,
+                                             retryTimes = self$numRetries)
 
             if(serverResponse$status_code < 200 || serverResponse$status_code >= 300)
                 stop(paste("Server code:", serverResponse$status_code))
@@ -82,8 +86,8 @@ RESTService <- R6::R6Class(
             headers <- list("Authorization" = paste("OAuth2", self$token),
                            "Destination" = toURL)
 
-            serverResponse <- self$http$execute("MOVE", fromURL, headers,
-                                                retryTimes = self$numRetries)
+            serverResponse <- self$http$exec("MOVE", fromURL, headers,
+                                             retryTimes = self$numRetries)
 
             if(serverResponse$status_code < 200 || serverResponse$status_code >= 300)
                 stop(paste("Server code:", serverResponse$status_code))
@@ -98,8 +102,8 @@ RESTService <- R6::R6Class(
 
             headers <- list("Authorization" = paste("OAuth2", self$token))
 
-            response <- self$http$execute("PROPFIND", collectionURL, headers,
-                                          retryTimes = self$numRetries)
+            response <- self$http$exec("PROPFIND", collectionURL, headers,
+                                       retryTimes = self$numRetries)
 
             if(all(response == ""))
                 stop("Response is empty, request may be misconfigured")
@@ -119,8 +123,8 @@ RESTService <- R6::R6Class(
 
             headers <- list("Authorization" = paste("OAuth2", self$token))
 
-            response <- self$http$execute("PROPFIND", subcollectionURL, headers,
-                                          retryTimes = self$numRetries)
+            response <- self$http$exec("PROPFIND", subcollectionURL, headers,
+                                       retryTimes = self$numRetries)
 
             if(all(response == ""))
                 stop("Response is empty, request may be misconfigured")
@@ -156,8 +160,8 @@ RESTService <- R6::R6Class(
             if(!(contentType %in% self$httpParser$validContentTypes))
                 stop("Invalid contentType. Please use text or raw.")
 
-            serverResponse <- self$http$execute("GET", fileURL, headers,
-                                                retryTimes = self$numRetries)
+            serverResponse <- self$http$exec("GET", fileURL, headers,
+                                             retryTimes = self$numRetries)
 
             if(serverResponse$status_code < 200 || serverResponse$status_code >= 300)
                 stop(paste("Server code:", serverResponse$status_code))
@@ -173,8 +177,8 @@ RESTService <- R6::R6Class(
                             "Content-Type" = contentType)
             body <- content
 
-            serverResponse <- self$http$execute("PUT", fileURL, headers, body,
-                                                retryTimes = self$numRetries)
+            serverResponse <- self$http$exec("PUT", fileURL, headers, body,
+                                             retryTimes = self$numRetries)
 
             if(serverResponse$status_code < 200 || serverResponse$status_code >= 300)
                 stop(paste("Server code:", serverResponse$status_code))
@@ -210,8 +214,8 @@ RESTService <- R6::R6Class(
                             "Content-Type" = contentType)
             body <- NULL
 
-            serverResponse <- self$http$execute("PUT", fileURL, headers, body,
-                                                retryTimes = self$numRetries)
+            serverResponse <- self$http$exec("PUT", fileURL, headers, body,
+                                             retryTimes = self$numRetries)
 
             if(serverResponse$status_code < 200 || serverResponse$status_code >= 300)
                 stop(paste("Server code:", serverResponse$status_code))
index b3b01f89c18eb07deabbefced545e34ad1f1b18f..60714a4ad835b9bc201fb780bb38b5fb8a81461c 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 source("./R/util.R")
 
 #' Subcollection
index 6db28f9dac63c070df26023da66c87e9b0124d67..3e8c2fa0cf2b1494c33a7246a9f97a8669a3b514 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 getAPIDocument <- function(){
     url <- "https://4xphq.arvadosapi.com/discovery/v1/apis/arvados/v1/rest"
     serverResponse <- httr::RETRY("GET", url = url)
index 57dd75f228ea410800897b60031d5734ca215f85..f796cb7b87eca67b3de28d5929221d792554b047 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 #' listAll
 #'
 #' List all resources even if the number of items is greater than maximum API limit.
index cfa0ce5ae6d94b79f16b7fb7ebd6e6dc035f8cf4..dcfa2186e9edba13493919c6e4eb192efa03c544 100644 (file)
@@ -1,3 +1,7 @@
+[comment]: # (Copyright (c) The Arvados Authors. All rights reserved.)
+[comment]: # ()
+[comment]: # (SPDX-License-Identifier: CC-BY-SA-3.0)
+
 ## R SDK for Arvados
 
 This SDK focuses on providing support for accessing Arvados projects, collections, and the files within collections.
index 73e088ecb61f1e299b1a801330448a42f8cf02cc..5decab9af3c8ff185dc9278e96a26be7462924c2 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 #Run script with $Rscript createDoc.R input.Rmd output.html
 
 require(knitr) # required for knitting from rmd to md
index 5314c86e28b0d3d7352733943019a3602dbf019b..593129bb3ceeb18bbc6cb2520529fd5067b823b1 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
 if (!requireNamespace("devtools")) {
   install.packages("devtools")
index 1f8931d917969115a382b61b2ac378e47e665764..156dde1080c5040373d55633ff8a689a8867484a 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 results <- devtools::test()
 any_error <- any(as.data.frame(results)$error)
 if (any_error) {
index 18ef411fd644144ac34ba76203f8bc6d8f793f17..9ca4f86fb67d76b3a0abc0b16734788e6fff882b 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 library(testthat)
 library(ArvadosR)
 
index 5886ff761f6d0b3b586397ac36562f9aa385eb0d..4fcfd6c67e53f12c8bbd9908d750b1a52c756e07 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 FakeArvados <- R6::R6Class(
 
     "FakeArvados",
index 865234d83552db7965f1d4077085a9f83c65ec4e..c97572c193f1eadbd315928fb09d56aff5e2d7a2 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 FakeHttpParser <- R6::R6Class(
 
     "FakeHttrParser",
index 533602886ab09e0d34a49e2829acaf73a9051baa..2633abdf2c745bf0e4c9afcee1b73b7c5751fbeb 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 FakeHttpRequest <- R6::R6Class(
 
     "FakeHttpRequest",
@@ -56,8 +60,8 @@ FakeHttpRequest <- R6::R6Class(
             self$serverMaxElementsPerRequest <- 5
         },
 
-        execute = function(verb, url, headers = NULL, body = NULL, query = NULL,
-                           limit = NULL, offset = NULL, retryTimes = 0)
+        exec = function(verb, url, headers = NULL, body = NULL, query = NULL,
+                        limit = NULL, offset = NULL, retryTimes = 0)
         {
             private$validateURL(url)
             private$validateHeaders(headers)
index d370e87fbe7e3ca581e4a36ac3a2a149989f18e1..08e8717de5e4b97b5776c2c6cc8893c523f4c133 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 FakeRESTService <- R6::R6Class(
 
     "FakeRESTService",
index 0c1db1a6417eb65cac844428b9dc6bdc44fbb0b5..fb14888aab91b982d88dbdddca0be9589f757fb8 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 source("fakes/FakeRESTService.R")
 
 context("ArvadosFile")
index ec00ca3c66dbcc66d875abee3d810a9ba06b9cd9..c3c70910e4c63acea6d86f5df71cc9bab9f3e72f 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 source("fakes/FakeRESTService.R")
 
 context("Collection")
index 42a54bf69422a31235768488ff2839716011d25d..5c8a40526988bb562c45b5702fd921a743f0a77c 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 context("CollectionTree")
 
 test_that("constructor creates file tree from character array properly", {
index b2862128c261f9cf8b8634ebcc384fe3113a286d..a119d88bf82fa226e26d5127f3ae001d1b515a2e 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 context("Http Parser")
 
 
index 5c2fb602fdb1edb1bdee38e93b831a35c34bb6f0..5ad8aa03115207035ee7f369ded5fbcd597e0ba7 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 context("Http Request")
 
 
index 898e59e3be89ef1c71aba81aca223b8d581c31a9..859b6180f3380c2d834b99e126aa0c7761155368 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 source("fakes/FakeArvados.R")
 source("fakes/FakeHttpRequest.R")
 source("fakes/FakeHttpParser.R")
index b2b0bc9ccfecc66998e43f7d7b6de40c31a6d68e..e025586c58a968f6c0d61a47512087a69d601635 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 source("fakes/FakeRESTService.R")
 
 context("Subcollection")
index ea091517c6b8abdf2f777e46046d8786623d1edc..9f5e07c1767af6c089274a308dc3dc270fb25c2f 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 context("Utility function")
 
 test_that("listAll always returns all resource items from server", {
index d509f400f1058396f2fc91e6ef320a2bbebe92e1..5b29ae517e8b15c33781cd70247c36d2ae831b94 100644 (file)
@@ -37,7 +37,7 @@ import arvados.commands._util as arv_cmd
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
@@ -76,7 +76,6 @@ class ArvCwlRunner(object):
         self.workflow_eval_lock = threading.Condition(threading.RLock())
         self.final_output = None
         self.final_status = None
-        self.uploaded = {}
         self.num_retries = num_retries
         self.uuid = None
         self.stop_polling = threading.Event()
@@ -238,12 +237,6 @@ class ArvCwlRunner(object):
         finally:
             self.stop_polling.set()
 
-    def get_uploaded(self):
-        return self.uploaded.copy()
-
-    def add_uploaded(self, src, pair):
-        self.uploaded[src] = pair
-
     def add_intermediate_output(self, uuid):
         if uuid:
             self.intermediate_output_collections.append(uuid)
@@ -401,6 +394,9 @@ class ArvCwlRunner(object):
         if self.intermediate_output_ttl < 0:
             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
 
+        if kwargs.get("submit_request_uuid") and self.work_api != "containers":
+            raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+
         if not kwargs.get("name"):
             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
 
@@ -706,6 +702,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
                         default=None)
 
+    parser.add_argument("--submit-request-uuid", type=str,
+                        default=None,
+                        help="Update and commit supplied container request instead of creating a new one (containers API only).")
+
     parser.add_argument("--name", type=str,
                         help="Name to use for workflow execution instance.",
                         default=None)
index 4e7811d2e8f5b0b477b82334b79385618c3456b9..0bec692643ad805c02d6b8358fae8a65841c1367 100644 (file)
@@ -51,7 +51,6 @@ class ArvadosContainer(object):
 
         container_request = {
             "command": self.command_line,
-            "owner_uuid": self.arvrunner.project_uuid,
             "name": self.name,
             "output_path": self.outdir,
             "cwd": self.outdir,
@@ -61,6 +60,9 @@ class ArvadosContainer(object):
         }
         runtime_constraints = {}
 
+        if self.arvrunner.project_uuid:
+            container_request["owner_uuid"] = self.arvrunner.project_uuid
+
         if self.arvrunner.secret_store.has_secret(self.command_line):
             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
 
@@ -251,9 +253,15 @@ class ArvadosContainer(object):
         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
 
         try:
-            response = self.arvrunner.api.container_requests().create(
-                body=container_request
-            ).execute(num_retries=self.arvrunner.num_retries)
+            if kwargs.get("submit_request_uuid"):
+                response = self.arvrunner.api.container_requests().update(
+                    uuid=kwargs["submit_request_uuid"],
+                    body=container_request
+                ).execute(num_retries=self.arvrunner.num_retries)
+            else:
+                response = self.arvrunner.api.container_requests().create(
+                    body=container_request
+                ).execute(num_retries=self.arvrunner.num_retries)
 
             self.uuid = response["uuid"]
             self.arvrunner.process_submitted(self)
@@ -343,7 +351,6 @@ class RunnerContainer(Runner):
                 self.job_order[param] = {"$include": mnt}
 
         container_req = {
-            "owner_uuid": self.arvrunner.project_uuid,
             "name": self.name,
             "output_path": "/var/spool/cwl",
             "cwd": "/var/spool/cwl",
@@ -442,11 +449,18 @@ class RunnerContainer(Runner):
     def run(self, **kwargs):
         kwargs["keepprefix"] = "keep:"
         job_spec = self.arvados_job_spec(**kwargs)
-        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+        if self.arvrunner.project_uuid:
+            job_spec["owner_uuid"] = self.arvrunner.project_uuid
 
-        response = self.arvrunner.api.container_requests().create(
-            body=job_spec
-        ).execute(num_retries=self.arvrunner.num_retries)
+        if kwargs.get("submit_request_uuid"):
+            response = self.arvrunner.api.container_requests().update(
+                uuid=kwargs["submit_request_uuid"],
+                body=job_spec
+            ).execute(num_retries=self.arvrunner.num_retries)
+        else:
+            response = self.arvrunner.api.container_requests().create(
+                body=job_spec
+            ).execute(num_retries=self.arvrunner.num_retries)
 
         self.uuid = response["uuid"]
         self.arvrunner.process_submitted(self)
index 8268300e75b66d6f82b999f649003ec3a0615bbf..fea6adfacc323539d7c2cd595f66d441859893b8 100644 (file)
@@ -39,6 +39,7 @@ class ArvadosCommandTool(CommandLineTool):
         # Workaround for #13365
         builderargs = kwargs.copy()
         builderargs["toplevel"] = True
+        builderargs["tmp_outdir_prefix"] = ""
         builder = self._init_job(joborder, **builderargs)
         joborder = builder.job
 
index 6f731fd6877b18fc6bc434bd8110fe2b44775196..f675fb10e80811e92e90b209f073c806c9777afb 100644 (file)
@@ -277,6 +277,7 @@ class ArvadosWorkflow(Workflow):
             })
             kwargs["loader"] = self.doc_loader
             kwargs["avsc_names"] = self.doc_schema
+            kwargs["metadata"]  = self.metadata
             return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
         else:
             return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
index bf940eca4ba92fad7e01ccd06e70564cb2fa0103..5024e95f77df785abf668c68364dadc4d49fb2a4 100644 (file)
@@ -132,6 +132,7 @@ def run():
         args.priority = arvados_cwl.DEFAULT_PRIORITY
         args.do_validate = True
         args.disable_js_validation = False
+        args.tmp_outdir_prefix = "tmp"
 
         runner.arv_executor(t, job_order_object, **vars(args))
     except Exception as e:
diff --git a/sdk/cwl/arvados_cwl/http.py b/sdk/cwl/arvados_cwl/http.py
new file mode 100644 (file)
index 0000000..4516de0
--- /dev/null
@@ -0,0 +1,151 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import requests
+import email.utils
+import time
+import datetime
+import re
+import arvados
+import arvados.collection
+import urlparse
+import logging
+import calendar
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+def my_formatdate(dt):
+    return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
+                                  localtime=False, usegmt=True)
+
+def my_parsedate(text):
+    parsed = email.utils.parsedate_tz(text)
+    if parsed:
+        if parsed[9]:
+            # Adjust to UTC
+            return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
+        else:
+            # TZ is zero or missing, assume UTC.
+            return datetime.datetime(*parsed[:6])
+    else:
+        return datetime.datetime(1970, 1, 1)
+
+def fresh_cache(url, properties, now):
+    pr = properties[url]
+    expires = None
+
+    logger.debug("Checking cache freshness for %s using %s", url, pr)
+
+    if "Cache-Control" in pr:
+        if re.match(r"immutable", pr["Cache-Control"]):
+            return True
+
+        g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
+        if g:
+            expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
+
+    if expires is None and "Expires" in pr:
+        expires = my_parsedate(pr["Expires"])
+
+    if expires is None:
+        # Use a default cache time of 24 hours if upstream didn't set
+        # any cache headers, to reduce redundant downloads.
+        expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
+
+    if not expires:
+        return False
+
+    return (now < expires)
+
+def remember_headers(url, properties, headers, now):
+    properties.setdefault(url, {})
+    for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
+        if h in headers:
+            properties[url][h] = headers[h]
+    if "Date" not in headers:
+        properties[url]["Date"] = my_formatdate(now)
+
+
+def changed(url, properties, now):
+    req = requests.head(url, allow_redirects=True)
+    remember_headers(url, properties, req.headers, now)
+
+    if req.status_code != 200:
+        raise Exception("Got status %s" % req.status_code)
+
+    pr = properties[url]
+    if "ETag" in pr and "ETag" in req.headers:
+        if pr["ETag"] == req.headers["ETag"]:
+            return False
+
+    return True
+
+def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
+    r = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+    now = utcnow()
+
+    for item in r["items"]:
+        properties = item["properties"]
+        if fresh_cache(url, properties, now):
+            # Do nothing
+            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+            return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
+
+        if not changed(url, properties, now):
+            # ETag didn't change, same content, just update headers
+            api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
+            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+            return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
+
+    properties = {}
+    req = requests.get(url, stream=True, allow_redirects=True)
+
+    if req.status_code != 200:
+        raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
+
+    remember_headers(url, properties, req.headers, now)
+
+    if "Content-Length" in properties[url]:
+        cl = int(properties[url]["Content-Length"])
+        logger.info("Downloading %s (%s bytes)", url, cl)
+    else:
+        cl = None
+        logger.info("Downloading %s (unknown size)", url)
+
+    c = arvados.collection.Collection()
+
+    if req.headers.get("Content-Disposition"):
+        grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
+        if grp.group(2):
+            name = grp.group(2)
+        else:
+            name = grp.group(4)
+    else:
+        name = urlparse.urlparse(url).path.split("/")[-1]
+
+    count = 0
+    start = time.time()
+    checkpoint = start
+    with c.open(name, "w") as f:
+        for chunk in req.iter_content(chunk_size=1024):
+            count += len(chunk)
+            f.write(chunk)
+            loopnow = time.time()
+            if (loopnow - checkpoint) > 20:
+                bps = (float(count)/float(loopnow - start))
+                if cl is not None:
+                    logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
+                                float(count * 100) / float(cl),
+                                bps/(1024*1024),
+                                (cl-count)/bps)
+                else:
+                    logger.info("%d downloaded, %3.2f MiB/s", count, bps/(1024*1024))
+                checkpoint = loopnow
+
+    c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
+
+    api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
+
+    return "keep:%s/%s" % (c.portable_data_hash(), name)
index 6fedb120300b2bdb575a663614840c5ba765b7ec..27e48f1f4408e33630985f2060ba738af720111f 100644 (file)
@@ -16,6 +16,8 @@ from schema_salad.sourceline import SourceLine
 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
 from cwltool.workflow import WorkflowException
 
+from .http import http_to_keep
+
 logger = logging.getLogger('arvados.cwl-runner')
 
 def trim_listing(obj):
@@ -81,6 +83,10 @@ class ArvPathMapper(PathMapper):
                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+            elif src.startswith("http:") or src.startswith("https:"):
+                keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
+                logger.info("%s is %s", src, keepref)
+                self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
             else:
                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
 
@@ -121,19 +127,6 @@ class ArvPathMapper(PathMapper):
                                                        keep_client=self.arvrunner.keep_client,
                                                        num_retries=self.arvrunner.num_retries)
 
-        already_uploaded = self.arvrunner.get_uploaded()
-        copied_files = set()
-        for k in referenced_files:
-            loc = k["location"]
-            if loc in already_uploaded:
-                v = already_uploaded[loc]
-                self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
-                if self.single_collection:
-                    basename = k["basename"]
-                    if basename not in collection:
-                        self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
-                        copied_files.add((loc, basename, v.type))
-
         for srcobj in referenced_files:
             self.visit(srcobj, uploadfiles)
 
@@ -144,16 +137,12 @@ class ArvPathMapper(PathMapper):
                                          fnPattern="keep:%s/%s",
                                          name=self.name,
                                          project=self.arvrunner.project_uuid,
-                                         collection=collection)
+                                         collection=collection,
+                                         packed=False)
 
         for src, ab, st in uploadfiles:
             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
                                            "Directory" if os.path.isdir(ab) else "File", True)
-            self.arvrunner.add_uploaded(src, self._pathmap[src])
-
-        for loc, basename, cls in copied_files:
-            fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
-            self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
 
         for srcobj in referenced_files:
             remap = []
index 3ce08f6cc7971973f7e6925bbc351d65b3492592..cf91f69f818cd51e721c658cd05d5a81e9df6e05 100644 (file)
@@ -122,11 +122,18 @@ def upload_dependencies(arvrunner, name, document_loader,
         # that external references in $include and $mixin are captured.
         scanobj = loadref("", workflowobj["id"])
 
-    sc = scandeps(uri, scanobj,
+    sc_result = scandeps(uri, scanobj,
                   loadref_fields,
                   set(("$include", "$schemas", "location")),
                   loadref, urljoin=document_loader.fetcher.urljoin)
 
+    sc = []
+    def only_real(obj):
+        if obj.get("location", "").startswith("file:"):
+            sc.append(obj)
+
+    visit_class(sc_result, ("File", "Directory"), only_real)
+
     normalizeFilesDirs(sc)
 
     if include_primary and "id" in workflowobj:
index a24d53dad6a629f9d08692bb19dd62e144655a7b..13e6d36c073b1db87ef124c8d034a654f27d10ed 100644 (file)
@@ -34,7 +34,7 @@ def get_version(setup_dir, module):
     else:
         try:
             save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
-        except subprocess.CalledProcessError:
+        except (subprocess.CalledProcessError, OSError):
             pass
 
     return read_version(setup_dir, module)
index 696837a366e51672eb8fbda1abf34c083da7f466..4c31d3b4450eac66ed5f839de2d34842913067f0 100644 (file)
@@ -33,13 +33,13 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20180508202931',
+          'cwltool==1.0.20180524215209',
           'schema-salad==2.7.20180501211602',
-          'typing==3.5.3.0',
+          'typing >= 3.5.3',
           'ruamel.yaml >=0.13.11, <0.15',
-          'arvados-python-client>=1.1.4.20180507184611',
+          'arvados-python-client>=1.1.4.20180607143841',
           'setuptools',
-          'ciso8601 >=1.0.6'
+          'ciso8601 >=1.0.6, <2.0.0'
       ],
       data_files=[
           ('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
index ddc7ff9588c67d396b28226d344a493a7a281200..a7445449af6030e7afee4bdb524ac55afc90b8ec 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: ExpressionTool
 requirements:
index 5c5571ab264097ad6ca1cee8196b00ccf92da22e..60c765788c3fad8233b5be760335407656ca13b4 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 dir:
   class: Directory
   location: samples
\ No newline at end of file
index 8c28cc2215b5d3eac21b94ad31292595dae25f43..e4730cfc76982e142cae4a9b52369e65be135af8 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: CommandLineTool
 requirements:
index 3f1e8902cc5a00488315d13cdb81a40f90000bb1..343df0bbda5c40f945f203a37d032ac154cde75a 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 requirements:
index 6c9e7f760c05c2fbeb67ae98573128709b3b0dd0..f5e5e702285e1ee545048a8b0f2ea54a61f645cf 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 {
    "cwlVersion": "v1.0",
       "arguments": [
index b37990aa9d78fb5f2efbbf93f9996dc013aa9cb0..6c49757bbe630625dcc84e01589bd917338d83e7 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: CommandLineTool
 inputs:
index 5d2c699022cfc4262b6f55e55551e726a7bc8590..19e4077e8a1e0965d331db2b958d6690e6c6e2e1 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 $namespaces:
index 8b9dd83031aa530340026b8be623db52442bf421..7eb6bcee6de4e01615be0699901dd539f8ccafe2 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 i:
   class: File
   location: keep:f225e6259bdd63bc7240599648dde9f1+97/hg19.fa
index 248aefd2c6ef9ad482f22be3f5c771a73b1e45b7..5539562070ff2c226b8188f66df8a452af7f59a8 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 $namespaces:
index 540edcf4f0dee20f99de723b32b276ab284bf403..20847d44900e630663ca1dfcc42137ae3114c98e 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 class: Workflow
 cwlVersion: v1.0
 inputs:
index 892973b5752b551f22c77cd84f977199398ac2dd..29f58f09c5085ecf9a344f38ba792d37c95f1548 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 class: CommandLineTool
 cwlVersion: v1.0
 inputs:
index 1107623925331bc231af23c9b139834752106883..2e61ee3e8c13c90bda45a30b5894615fcc6e868e 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 toplevel_input:
   class: File
   location: keep:4d8a70b1e63b2aad6984e40e338e2373+69/hello.txt
\ No newline at end of file
index 883d24e857342d98a04ff59ba9d3fe80978d78f2..254ed91b81abaf5de60d6b90de4613f3c10d35ed 100644 (file)
@@ -1 +1,5 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 pw: blorp
index 522946a4f49ee2acd68588c6100d45bcb097cbe8..2295e934ac77de76182d04749715a57f730874b4 100644 (file)
@@ -53,7 +53,8 @@ class TestContainer(unittest.TestCase):
             make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                     basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+                                                     basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+                                                     metadata={"cwlVersion": "v1.0"})
             arvtool.formatgraph = None
             for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse),
                                  make_fs_access=make_fs_access, tmpdir="/tmp"):
@@ -139,7 +140,7 @@ class TestContainer(unittest.TestCase):
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
-                                                 loader=Loader({}))
+                                                 loader=Loader({}), metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements",
                              make_fs_access=make_fs_access, tmpdir="/tmp"):
@@ -251,7 +252,7 @@ class TestContainer(unittest.TestCase):
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
-                                                 loader=Loader({}))
+                                                 loader=Loader({}), metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir",
                              make_fs_access=make_fs_access, tmpdir="/tmp"):
@@ -352,7 +353,8 @@ class TestContainer(unittest.TestCase):
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+                                                 metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
                              make_fs_access=make_fs_access, tmpdir="/tmp"):
@@ -477,7 +479,8 @@ class TestContainer(unittest.TestCase):
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                      collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+                                                 metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
         job_order = {
             "p1": {
@@ -584,7 +587,8 @@ class TestContainer(unittest.TestCase):
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                      collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+                                                 metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
 
         job_order = {"pw": "blorp"}
diff --git a/sdk/cwl/tests/test_http.py b/sdk/cwl/tests/test_http.py
new file mode 100644 (file)
index 0000000..0c66c39
--- /dev/null
@@ -0,0 +1,286 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import copy
+import cStringIO
+import functools
+import hashlib
+import json
+import logging
+import mock
+import sys
+import unittest
+import datetime
+
+import arvados
+import arvados.collection
+import arvados_cwl
+import arvados_cwl.runner
+import arvados.keep
+
+from .matcher import JsonDiffMatcher, StripYAMLComments
+from .mock_discovery import get_rootDesc
+
+import arvados_cwl.http
+
+import ruamel.yaml as yaml
+
+
+class TestHttpToKeep(unittest.TestCase):
+
+    @mock.patch("requests.get")
+    @mock.patch("arvados.collection.Collection")
+    def test_http_get(self, collectionmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": []
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {}
+        req.iter_content.return_value = ["abc"]
+        getmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 15)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+
+        cm.open.assert_called_with("file1.txt", "w")
+        cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
+                                       owner_uuid=None, ensure_unique_name=True)
+
+        api.collections().update.assert_has_calls([
+            mock.call(uuid=cm.manifest_locator(),
+                      body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
+        ])
+
+
+    @mock.patch("requests.get")
+    @mock.patch("arvados.collection.CollectionReader")
+    def test_http_expires(self, collectionmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": [{
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+                "portable_data_hash": "99999999999999999999999999999998+99",
+                "properties": {
+                    'http://example.com/file1.txt': {
+                        'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+                        'Expires': 'Tue, 17 May 2018 00:00:00 GMT'
+                    }
+                }
+            }]
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        cm.keys.return_value = ["file1.txt"]
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {}
+        req.iter_content.return_value = ["abc"]
+        getmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 16)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        getmock.assert_not_called()
+
+
+    @mock.patch("requests.get")
+    @mock.patch("arvados.collection.CollectionReader")
+    def test_http_cache_control(self, collectionmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": [{
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+                "portable_data_hash": "99999999999999999999999999999998+99",
+                "properties": {
+                    'http://example.com/file1.txt': {
+                        'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+                        'Cache-Control': 'max-age=172800'
+                    }
+                }
+            }]
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        cm.keys.return_value = ["file1.txt"]
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {}
+        req.iter_content.return_value = ["abc"]
+        getmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 16)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        getmock.assert_not_called()
+
+
+    @mock.patch("requests.get")
+    @mock.patch("requests.head")
+    @mock.patch("arvados.collection.Collection")
+    def test_http_expired(self, collectionmock, headmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": [{
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+                "portable_data_hash": "99999999999999999999999999999998+99",
+                "properties": {
+                    'http://example.com/file1.txt': {
+                        'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+                        'Expires': 'Tue, 16 May 2018 00:00:00 GMT'
+                    }
+                }
+            }]
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz4"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999997+99"
+        cm.keys.return_value = ["file1.txt"]
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}
+        req.iter_content.return_value = ["def"]
+        getmock.return_value = req
+        headmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999997+99/file1.txt")
+
+        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+
+        cm.open.assert_called_with("file1.txt", "w")
+        cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
+                                       owner_uuid=None, ensure_unique_name=True)
+
+        api.collections().update.assert_has_calls([
+            mock.call(uuid=cm.manifest_locator(),
+                      body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}}}})
+        ])
+
+
+    @mock.patch("requests.get")
+    @mock.patch("requests.head")
+    @mock.patch("arvados.collection.CollectionReader")
+    def test_http_etag(self, collectionmock, headmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": [{
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+                "portable_data_hash": "99999999999999999999999999999998+99",
+                "properties": {
+                    'http://example.com/file1.txt': {
+                        'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+                        'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+                        'ETag': '123456'
+                    }
+                }
+            }]
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        cm.keys.return_value = ["file1.txt"]
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {
+            'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+            'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+            'ETag': '123456'
+        }
+        headmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        getmock.assert_not_called()
+        cm.open.assert_not_called()
+
+        api.collections().update.assert_has_calls([
+            mock.call(uuid=cm.manifest_locator(),
+                      body={"collection":{"properties": {'http://example.com/file1.txt': {
+                          'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+                          'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+                          'ETag': '123456'
+                      }}}})
+                      ])
+
+    @mock.patch("requests.get")
+    @mock.patch("arvados.collection.Collection")
+    def test_http_content_disp(self, collectionmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": []
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {"Content-Disposition": "attachment; filename=file1.txt"}
+        req.iter_content.return_value = ["abc"]
+        getmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 15)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True)
+
+        cm.open.assert_called_with("file1.txt", "w")
+        cm.save_new.assert_called_with(name="Downloaded from http://example.com/download?fn=/file1.txt",
+                                       owner_uuid=None, ensure_unique_name=True)
+
+        api.collections().update.assert_has_calls([
+            mock.call(uuid=cm.manifest_locator(),
+                      body={"collection":{"properties": {"http://example.com/download?fn=/file1.txt": {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
+        ])
index 6d2598edaa8e4bdf2894c83a106c171d34fd1937..30930dd49abb1a91fe371cc291cd916feac735df 100644 (file)
@@ -59,7 +59,8 @@ class TestJob(unittest.TestCase):
             make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                     basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+                                                     basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+                                                     metadata={"cwlVersion": "v1.0"})
             arvtool.formatgraph = None
             for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
                 j.run(enable_reuse=enable_reuse)
@@ -150,7 +151,8 @@ class TestJob(unittest.TestCase):
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                 make_fs_access=make_fs_access, loader=Loader({}))
+                                                 make_fs_access=make_fs_access, loader=Loader({}),
+                                                 metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
             j.run(enable_reuse=True)
@@ -346,7 +348,7 @@ class TestWorkflow(unittest.TestCase):
                                               basedir="", make_fs_access=make_fs_access, loader=document_loader,
                                               makeTool=runner.arv_make_tool, metadata=metadata)
         arvtool.formatgraph = None
-        it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
+        it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
         it.next().run()
         it.next().run()
 
@@ -432,7 +434,7 @@ class TestWorkflow(unittest.TestCase):
                                               basedir="", make_fs_access=make_fs_access, loader=document_loader,
                                               makeTool=runner.arv_make_tool, metadata=metadata)
         arvtool.formatgraph = None
-        it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
+        it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
         it.next().run()
         it.next().run()
 
index 9649b838726d845ebe56418934852fe080f2dfc7..eaa57114222233d6bcbd02ff2674c89f5169b168 100644 (file)
@@ -20,7 +20,7 @@ from .mock_discovery import get_rootDesc
 
 from arvados_cwl.pathmapper import ArvPathMapper
 
-def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None, collection=None):
+def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None, collection=None, packed=None):
     pdh = "99999999999999999999999999999991+99"
     for c in files:
         c.keepref = "%s/%s" % (pdh, os.path.basename(c.fn))
@@ -66,23 +66,6 @@ class TestPathmap(unittest.TestCase):
         self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
                          p._pathmap)
 
-    @mock.patch("arvados.commands.run.uploadfiles")
-    def test_prev_uploaded(self, upl):
-        """Test pathmapper handling previously uploaded files."""
-
-        arvrunner = arvados_cwl.ArvCwlRunner(self.api)
-        arvrunner.add_uploaded('file:tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999992+99/hw.py', target='', type='File', staged=True))
-
-        upl.side_effect = upload_mock
-
-        p = ArvPathMapper(arvrunner, [{
-            "class": "File",
-            "location": "file:tests/hw.py"
-        }], "", "/test/%s", "/test/%s/%s")
-
-        self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999992+99/hw.py', target='/test/99999999999999999999999999999992+99/hw.py', type='File', staged=True)},
-                         p._pathmap)
-
     @mock.patch("arvados.commands.run.uploadfiles")
     @mock.patch("arvados.commands.run.statfile")
     def test_statfile(self, statfile, upl):
index 77bef075fa97903e96bcc18e5f6c2c11dc4f5654..f8b557f6cbe86bf4b90bc55a3f4941c88560d948 100644 (file)
@@ -234,7 +234,6 @@ def stubs(func):
             },
             'secret_mounts': {},
             'state': 'Committed',
-            'owner_uuid': None,
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
                         '--eval-timeout=20', '--thread-count=4',
@@ -751,7 +750,6 @@ class TestSubmit(unittest.TestCase):
                     'kind': 'json'
                 }
             }, 'state': 'Committed',
-            'owner_uuid': None,
             'output_path': '/var/spool/cwl',
             'name': 'expect_arvworkflow.cwl#main',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
@@ -870,7 +868,6 @@ class TestSubmit(unittest.TestCase):
                     'kind': 'json'
                 }
             }, 'state': 'Committed',
-            'owner_uuid': None,
             'output_path': '/var/spool/cwl',
             'name': 'a test workflow',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
@@ -1236,7 +1233,6 @@ class TestSubmit(unittest.TestCase):
             },
             "name": "secret_wf.cwl",
             "output_path": "/var/spool/cwl",
-            "owner_uuid": None,
             "priority": 500,
             "properties": {},
             "runtime_constraints": {
@@ -1259,6 +1255,31 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_container_request_uuid + '\n')
 
+    @stubs
+    def test_submit_request_uuid(self, stubs):
+        stubs.expect_container_request_uuid = "zzzzz-xvhdp-yyyyyyyyyyyyyyy"
+
+        stubs.api.container_requests().update().execute.return_value = {
+            "uuid": stubs.expect_container_request_uuid,
+            "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
+            "state": "Queued"
+        }
+
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--submit-request-uuid=zzzzz-xvhdp-yyyyyyyyyyyyyyy",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        stubs.api.container_requests().update.assert_called_with(
+            uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
 
 class TestCreateTemplate(unittest.TestCase):
     existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
index ed09e6e27f6ed26c05b1671b0b2a19cfe994a987..fdd56bedfd0610f995661f73dd73e688c484a9bf 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: CommandLineTool
 inputs:
index c8264647161cf412fd20cbcd08febf01ca92154f..98931ab7eceb74e6429e663ff52b34fdb984e4f4 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: CommandLineTool
 inputs:
index ab7b0a4c6f345711d5cc1ce550edb97f803ba7d6..3d0fe224f261a19245977444ecd308dffb907a55 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: CommandLineTool
 inputs:
index cd57ff34cd98515176d9498639d2762ab189fd5c..8bfc5d63f744a784e13d78ec48049211ae629c48 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 inputs: []
index d4f667c03443a18aea9b4e5f36e32a335cf991b8..2e66b1013d2f435d7d6232e8e10f4180e3c6b3f1 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 inputs: []
index 597ea96da35cd82f70ac3e3424f727655117b4f2..f779aef8cf6d7ab6d24c49b22b568ffb91e638d2 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 inputs: []
index 76437a29825398846b6afd9a2ae830b23ca7d72f..ccc0ceb7325105184465b0b88f61eedcc1c4d3cc 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: CommandLineTool
 inputs:
index 4e6372b62048fad5a14e259d2ad511088402a120..5c74ef0045f03a414b255e56b5a2a81c65923589 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 inputs:
index df9009ada51bd61ab765191bc765b597b0c382fb..4b71c1341542cf0a573a941c240bbc2e4cb49a4f 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: CommandLineTool
 inputs:
index 45faa8937c517d809fff39adae6a62fcf5c70945..0133c7aef13dd068c0074c3d0e650b0116fde6ea 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 inputs: []
index 7ba96eea4d8bd1023c95182b74f0e4502e928acd..ffe8731438109d218a5da2316d05d4376bbeff71 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 inputs: []
index 911650d61fcd53f0068d865cf6637ec8872fdab9..0292d1377c917f58bccb63194825c68a29288188 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 inputs: []
index d6e65afd6c355b75b2e82e6d7656b2f714046364..6e562e43dbd791f390dd25f6803e4a23c49ce967 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 inputs: []
index 631af182b9e46893848cfe2c3619950a02527b32..de2748c7322619b69499ee5e11640815412187bd 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 inputs: []
index bd26cc1d1bd6d5d2b907c7372c07d1663ace963c..6bcf69ed7fcd50561377878d7e1da9efca7a6240 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 inputs: []
index ac07b9dbb21f333b136be5106ab98c72545502c0..715f1efcdfb542065faf11bc208463918a9a3c88 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 inputs: []
index 55b7b19430dc6a05a6ffde432ffb33a8d7aaa8c1..355872232bc7f430a8b61f7e8f8dffbe09cc5530 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 import arvados
 import sys
 import os
index 29dc3d6aea63701cdcedf8d4226127d6a2c24d99..d7c8037588c2ff64e43da8f2e59f991e974141ab 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 requirements:
index 63a543881c230941db6ee45b177ab706e057e384..5cdd80dbdbc7a145d9e712d596b1165c91520dcc 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 $namespaces:
index b7893e221104308771c334c984ea0ad303646eb6..0a734b326355361f6b8ccc4bc296384e58a57d2e 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: CommandLineTool
 requirements:
index 4db11ccdf2eb9ff26dcd5305ee528b418cb065ce..7a052f86cf36168343eab48b3c9738f5da55497e 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: CommandLineTool
 requirements:
index 0ddeb645022374effe499c1f164b7c61834effa0..2be74b2658b1b67ebdb231e22c6204e53aabc96b 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: CommandLineTool
 $namespaces:
index 17c92d678e3edbe9db442ab74e6c4c23fbee8333..05d950d18c08be14ea72ea297585412136f8f198 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 cwlVersion: v1.0
 class: Workflow
 $namespaces:
index 9ed0eacf23e6d753c1b6c2a0f781282c96dde8cc..b0c7069cd98007ec0f24ad45cb809c137784eaae 100644 (file)
@@ -62,6 +62,7 @@ type InstanceType struct {
        RAM          int64
        Scratch      int64
        Price        float64
+       Preemptable  bool
 }
 
 // GetThisSystemNode returns a SystemNode for the node we're running
index daafc4995448524f7fe3794b9facd13e01480823..e71bcd5d0da5cdeaebdc6dfd3be05cd81d681d58 100644 (file)
@@ -52,7 +52,8 @@ type RuntimeConstraints struct {
 // SchedulingParameters specify a container's scheduling parameters
 // such as Partitions
 type SchedulingParameters struct {
-       Partitions []string `json:"partitions"`
+       Partitions  []string `json:"partitions"`
+       Preemptable bool     `json:"preemptable"`
 }
 
 // ContainerList is an arvados#containerList resource.
index 127bee8ddf7c6e641597d92c935353eece84564b..9af8d0ad405828b0c8e9906575cb1b77752eaa63 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package arvados
 
 import (
index 297a8617084b3247c7b15c18a8ae9c2176224518..5edb1f95ca86acbfac5bf7dfd961822a33003ee1 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package health
 
 import (
index 7e601f2e70211e30b1786edae098380ba89556e6..8a540371cbbf01ffcf7bf1bb97b94713ad303f74 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package health
 
 import (
index d88e767dd2252dca331cb54e7645292558d623bd..169f1457e2e06e6e3424856809c92fc5dc74d4f9 100644 (file)
@@ -101,6 +101,7 @@ type KeepClient struct {
        Retries            int
        BlockCache         *BlockCache
        RequestID          string
+       StorageClasses     []string
 
        // set to 1 if all writable services are of disk type, otherwise 0
        replicasPerService int
index 3b8de262be395295f29adc79a1e756dd8dd3c4e2..dc80ad7e1d6378ad09da968db62cf038002d0b9c 100644 (file)
@@ -93,16 +93,18 @@ func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
 }
 
 type StubPutHandler struct {
-       c              *C
-       expectPath     string
-       expectApiToken string
-       expectBody     string
-       handled        chan string
+       c                  *C
+       expectPath         string
+       expectApiToken     string
+       expectBody         string
+       expectStorageClass string
+       handled            chan string
 }
 
 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
        sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
+       sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
        body, err := ioutil.ReadAll(req.Body)
        sph.c.Check(err, Equals, nil)
        sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
@@ -148,11 +150,12 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
                "acbd18db4cc2f85cedef654fccc4a4d8",
                "abc123",
                "foo",
+               "hot",
                make(chan string)}
 
        UploadToStubHelper(c, st,
                func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) {
-
+                       kc.StorageClasses = []string{"hot"}
                        go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), kc.getRequestID())
 
                        writer.Write([]byte("foo"))
@@ -170,6 +173,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
                "acbd18db4cc2f85cedef654fccc4a4d8",
                "abc123",
                "foo",
+               "",
                make(chan string)}
 
        UploadToStubHelper(c, st,
@@ -265,6 +269,7 @@ func (s *StandaloneSuite) TestPutB(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -306,6 +311,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -354,6 +360,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 4)}
 
        fh := FailHandler{
@@ -412,6 +419,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 1)}
 
        fh := FailHandler{
@@ -989,6 +997,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -1027,6 +1036,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -1198,6 +1208,7 @@ func (s *StandaloneSuite) TestPutBRetry(c *C) {
                        Md5String("foo"),
                        "abc123",
                        "foo",
+                       "",
                        make(chan string, 5)}}
 
        arv, _ := arvadosclient.MakeArvadosClient()
index bfe8d5b77a4410929ba7f8a23ffbdf3435e58588..542827f5e0d83c5d074942ef4546955e59b46ba5 100644 (file)
@@ -80,6 +80,9 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
        req.Header.Add("Authorization", "OAuth2 "+this.Arvados.ApiToken)
        req.Header.Add("Content-Type", "application/octet-stream")
        req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
+       if len(this.StorageClasses) > 0 {
+               req.Header.Add("X-Keep-Storage-Classes", strings.Join(this.StorageClasses, ", "))
+       }
 
        var resp *http.Response
        if resp, err = this.httpClient().Do(req); err != nil {
index 8fb90c944396967e6863a38daee27ffe3cb8b9ec..e38a6bd475c7b8a4aee7787a537a40656fd93b36 100644 (file)
@@ -13,6 +13,8 @@ import os
 import re
 import errno
 import hashlib
+import datetime
+import ciso8601
 import time
 import threading
 
@@ -1269,6 +1271,18 @@ class Collection(RichCollectionBase):
     def root_collection(self):
         return self
 
+    def get_properties(self):
+        if self._api_response and self._api_response["properties"]:
+            return self._api_response["properties"]
+        else:
+            return {}
+
+    def get_trash_at(self):
+        if self._api_response and self._api_response["trash_at"]:
+            return ciso8601.parse_datetime(self._api_response["trash_at"])
+        else:
+            return None
+
     def stream_name(self):
         return "."
 
@@ -1436,17 +1450,34 @@ class Collection(RichCollectionBase):
     @must_be_writable
     @synchronized
     @retry_method
-    def save(self, merge=True, num_retries=None):
+    def save(self,
+             properties=None,
+             storage_classes=None,
+             trash_at=None,
+             merge=True,
+             num_retries=None):
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
-        merge=True, the default), and update the collection record.  Returns
+        merge=True, the default), and update the collection record. Returns
         the current manifest text.
 
         Will raise AssertionError if not associated with a collection record on
         the API server.  If you want to save a manifest to Keep only, see
         `save_new()`.
 
+        :properties:
+          Additional properties of collection. This value will replace any existing
+          properties of collection.
+
+        :storage_classes:
+          Specify desirable storage classes to be used when writing data to Keep.
+
+        :trash_at:
+          A collection is *expiring* when it has a *trash_at* time in the future.
+          An expiring collection can be accessed as normal,
+          but is scheduled to be trashed automatically at the *trash_at* time.
+
         :merge:
           Update and merge remote changes before saving.  Otherwise, any
           remote changes will be ignored and overwritten.
@@ -1455,6 +1486,24 @@ class Collection(RichCollectionBase):
           Retry count on API calls (if None,  use the collection default)
 
         """
+        if properties and type(properties) is not dict:
+            raise errors.ArgumentError("properties must be dictionary type.")
+
+        if storage_classes and type(storage_classes) is not list:
+            raise errors.ArgumentError("storage_classes must be list type.")
+
+        if trash_at and type(trash_at) is not datetime.datetime:
+            raise errors.ArgumentError("trash_at must be datetime type.")
+
+        body={}
+        if properties:
+            body["properties"] = properties
+        if storage_classes:
+            body["storage_classes_desired"] = storage_classes
+        if trash_at:
+            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+            body["trash_at"] = t
+
         if not self.committed():
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
@@ -1465,14 +1514,20 @@ class Collection(RichCollectionBase):
                 self.update()
 
             text = self.manifest_text(strip=False)
+            body['manifest_text'] = text
+
             self._remember_api_response(self._my_api().collections().update(
                 uuid=self._manifest_locator,
-                body={'manifest_text': text}
-                ).execute(
-                    num_retries=num_retries))
+                body=body
+                ).execute(num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
             self._portable_data_hash = self._api_response["portable_data_hash"]
             self.set_committed(True)
+        elif body:
+            self._remember_api_response(self._my_api().collections().update(
+                uuid=self._manifest_locator,
+                body=body
+                ).execute(num_retries=num_retries))
 
         return self._manifest_text
 
@@ -1483,6 +1538,9 @@ class Collection(RichCollectionBase):
     def save_new(self, name=None,
                  create_collection_record=True,
                  owner_uuid=None,
+                 properties=None,
+                 storage_classes=None,
+                 trash_at=None,
                  ensure_unique_name=False,
                  num_retries=None):
         """Save collection to a new collection record.
@@ -1490,7 +1548,7 @@ class Collection(RichCollectionBase):
         Commit pending buffer blocks to Keep and, when create_collection_record
         is True (default), create a new collection record.  After creating a
         new collection record, this Collection object will be associated with
-        the new record used by `save()`.  Returns the current manifest text.
+        the new record used by `save()`. Returns the current manifest text.
 
         :name:
           The collection name.
@@ -1503,6 +1561,18 @@ class Collection(RichCollectionBase):
           the user, or project uuid that will own this collection.
           If None, defaults to the current user.
 
+        :properties:
+          Additional properties of collection. This value will replace any existing
+          properties of collection.
+
+        :storage_classes:
+          Specify desirable storage classes to be used when writing data to Keep.
+
+        :trash_at:
+          A collection is *expiring* when it has a *trash_at* time in the future.
+          An expiring collection can be accessed as normal,
+          but is scheduled to be trashed automatically at the *trash_at* time.
+
         :ensure_unique_name:
           If True, ask the API server to rename the collection
           if it conflicts with a collection with the same name and owner.  If
@@ -1512,6 +1582,15 @@ class Collection(RichCollectionBase):
           Retry count on API calls (if None,  use the collection default)
 
         """
+        if properties and type(properties) is not dict:
+            raise errors.ArgumentError("properties must be dictionary type.")
+
+        if storage_classes and type(storage_classes) is not list:
+            raise errors.ArgumentError("storage_classes must be list type.")
+
+        if trash_at and type(trash_at) is not datetime.datetime:
+            raise errors.ArgumentError("trash_at must be datetime type.")
+
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
@@ -1525,6 +1604,13 @@ class Collection(RichCollectionBase):
                     "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
+            if properties:
+                body["properties"] = properties
+            if storage_classes:
+                body["storage_classes_desired"] = storage_classes
+            if trash_at:
+                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+                body["trash_at"] = t
 
             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
index 388d87b3a6f99ce51dc6b39248fd6810394828f3..cba00c3c8cf153039de990d27867558d0dbc699a 100644 (file)
@@ -140,6 +140,10 @@ physical storage devices (e.g., disks) should have a copy of each data
 block. Default is to use the server-provided default (if any) or 2.
 """)
 
+upload_opts.add_argument('--storage-classes', help="""
+Specify comma separated list of storage classes to be used when saving data to Keep.
+""")
+
 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
                          help="""
 Set the number of upload threads to be used. Take into account that
@@ -418,8 +422,8 @@ class ArvPutUploadJob(object):
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
                  name=None, owner_uuid=None, api_client=None,
                  ensure_unique_name=False, num_retries=None,
-                 put_threads=None, replication_desired=None,
-                 filename=None, update_time=60.0, update_collection=None,
+                 put_threads=None, replication_desired=None, filename=None,
+                 update_time=60.0, update_collection=None, storage_classes=None,
                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
                  follow_links=True, exclude_paths=[], exclude_names=None):
         self.paths = paths
@@ -439,6 +443,7 @@ class ArvPutUploadJob(object):
         self.replication_desired = replication_desired
         self.put_threads = put_threads
         self.filename = filename
+        self.storage_classes = storage_classes
         self._api_client = api_client
         self._state_lock = threading.Lock()
         self._state = None # Previous run state (file list & manifest)
@@ -614,10 +619,14 @@ class ArvPutUploadJob(object):
                 else:
                     # The file already exist on remote collection, skip it.
                     pass
-            self._remote_collection.save(num_retries=self.num_retries)
+            self._remote_collection.save(storage_classes=self.storage_classes,
+                                         num_retries=self.num_retries)
         else:
+            if self.storage_classes is None:
+                self.storage_classes = ['default']
             self._local_collection.save_new(
                 name=self.name, owner_uuid=self.owner_uuid,
+                storage_classes=self.storage_classes,
                 ensure_unique_name=self.ensure_unique_name,
                 num_retries=self.num_retries)
 
@@ -1045,6 +1054,15 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     else:
         reporter = None
 
+    #  Split storage-classes argument
+    storage_classes = None
+    if args.storage_classes:
+        storage_classes = args.storage_classes.strip().split(',')
+        if len(storage_classes) > 1:
+            logger.error("Multiple storage classes are not supported currently.")
+            sys.exit(1)
+
+
     # Setup exclude regex from all the --exclude arguments provided
     name_patterns = []
     exclude_paths = []
@@ -1102,6 +1120,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
                                  owner_uuid = project_uuid,
                                  ensure_unique_name = True,
                                  update_collection = args.update_collection,
+                                 storage_classes=storage_classes,
                                  logger=logger,
                                  dry_run=args.dry_run,
                                  follow_links=args.follow_links,
index 831e496a29786d58d5272e4cffda0e4d24838bdd..c4748fa995759ef0cc934b699a14523f8a3181f8 100644 (file)
@@ -136,20 +136,21 @@ def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)",
 
     return prefix+fn
 
-def write_file(collection, pathprefix, fn):
+def write_file(collection, pathprefix, fn, flush=False):
     with open(os.path.join(pathprefix, fn)) as src:
         dst = collection.open(fn, "w")
         r = src.read(1024*128)
         while r:
             dst.write(r)
             r = src.read(1024*128)
-        dst.close(flush=False)
+        dst.close(flush=flush)
 
 def uploadfiles(files, api, dry_run=False, num_retries=0,
                 project=None,
                 fnPattern="$(file %s/%s)",
                 name=None,
-                collection=None):
+                collection=None,
+                packed=True):
     # Find the smallest path prefix that includes all the files that need to be uploaded.
     # This starts at the root and iteratively removes common parent directory prefixes
     # until all file paths no longer have a common parent.
@@ -199,12 +200,12 @@ def uploadfiles(files, api, dry_run=False, num_retries=0,
                 continue
             prev = localpath
             if os.path.isfile(localpath):
-                write_file(collection, pathprefix, f.fn)
+                write_file(collection, pathprefix, f.fn, not packed)
             elif os.path.isdir(localpath):
                 for root, dirs, iterfiles in os.walk(localpath):
                     root = root[len(pathprefix):]
                     for src in iterfiles:
-                        write_file(collection, pathprefix, os.path.join(root, src))
+                        write_file(collection, pathprefix, os.path.join(root, src), not packed)
 
         filters=[["portable_data_hash", "=", collection.portable_data_hash()]]
         if name:
index dfeaffffaf50de65647562e29d57e33751ff9f55..6e3f59dd7ede16fee8eb1da387fa93a8ec83fc4d 100644 (file)
@@ -46,7 +46,7 @@ setup(name='arvados-python-client',
           ('share/doc/arvados-python-client', ['LICENSE-2.0.txt', 'README.rst']),
       ],
       install_requires=[
-          'ciso8601 >=1.0.6',
+          'ciso8601 >=1.0.6, <2.0.0',
           'future',
           'google-api-python-client >=1.6.2, <1.7',
           'httplib2 >=0.9.2',
index 4b1f69477e5823502b2a5396a586db82a56e6ff7..93cfdc2a36c26389a3259222304e7ba1d5de7dff 100644 (file)
@@ -730,6 +730,11 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers,
                           self.call_main_with_args,
                           ['--project-uuid', self.Z_UUID, '--stream'])
 
+    def test_error_when_multiple_storage_classes_specified(self):
+        self.assertRaises(SystemExit,
+                          self.call_main_with_args,
+                          ['--storage-classes', 'hot,cold'])
+
     def test_error_when_excluding_absolute_path(self):
         tmpdir = self.make_tmpdir()
         self.assertRaises(SystemExit,
@@ -1061,6 +1066,18 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                                        '--project-uuid', self.PROJECT_UUID])
         self.assertEqual(link_name, collection['name'])
 
+    def test_put_collection_with_storage_classes_specified(self):
+        collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
+
+        self.assertEqual(len(collection['storage_classes_desired']), 1)
+        self.assertEqual(collection['storage_classes_desired'][0], 'hot')
+
+    def test_put_collection_without_storage_classes_specified(self):
+        collection = self.run_and_find_collection("")
+
+        self.assertEqual(len(collection['storage_classes_desired']), 1)
+        self.assertEqual(collection['storage_classes_desired'][0], 'default')
+
     def test_exclude_filename_pattern(self):
         tmpdir = self.make_tmpdir()
         tmpsubdir = os.path.join(tmpdir, 'subdir')
index 49c00191bebe02cc8e267b397212a893a33f246a..722cc56046c99777f864833be641e81914039af5 100644 (file)
@@ -14,6 +14,8 @@ import random
 import re
 import sys
 import tempfile
+import datetime
+import ciso8601
 import time
 import unittest
 
@@ -802,6 +804,18 @@ class CollectionMethods(run_test_server.TestCaseWithServers):
         self.assertEqual(fn0, c.items()[0][0])
         self.assertEqual(fn1, c.items()[1][0])
 
+    def test_get_properties(self):
+        c = Collection()
+        self.assertEqual(c.get_properties(), {})
+        c.save_new(properties={"foo":"bar"})
+        self.assertEqual(c.get_properties(), {"foo":"bar"})
+
+    def test_get_trash_at(self):
+        c = Collection()
+        self.assertEqual(c.get_trash_at(), None)
+        c.save_new(trash_at=datetime.datetime(2111, 1, 1, 11, 11, 11, 111111))
+        self.assertEqual(c.get_trash_at(), ciso8601.parse_datetime('2111-01-01T11:11:11.111111000Z'))
+
 
 class CollectionOpenModes(run_test_server.TestCaseWithServers):
 
@@ -1300,17 +1314,43 @@ class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
 
     def test_create_and_save(self):
         c = self.create_count_txt()
-        c.save()
+        c.save(properties={'type' : 'Intermediate'},
+               storage_classes=['archive'],
+               trash_at=datetime.datetime(2111, 1, 1, 11, 11, 11, 111111))
+
         self.assertRegex(
             c.manifest_text(),
             r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
+        self.assertEqual(c.api_response()["storage_classes_desired"], ['archive'])
+        self.assertEqual(c.api_response()["properties"], {'type' : 'Intermediate'})
+        self.assertEqual(c.api_response()["trash_at"], '2111-01-01T11:11:11.111111000Z')
+
 
     def test_create_and_save_new(self):
         c = self.create_count_txt()
-        c.save_new()
+        c.save_new(properties={'type' : 'Intermediate'},
+                   storage_classes=['archive'],
+                   trash_at=datetime.datetime(2111, 1, 1, 11, 11, 11, 111111))
+
         self.assertRegex(
             c.manifest_text(),
             r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
+        self.assertEqual(c.api_response()["storage_classes_desired"], ['archive'])
+        self.assertEqual(c.api_response()["properties"], {'type' : 'Intermediate'})
+        self.assertEqual(c.api_response()["trash_at"], '2111-01-01T11:11:11.111111000Z')
+
+    def test_create_and_save_after_commiting(self):
+        c = self.create_count_txt()
+        c.save(properties={'type' : 'Intermediate'},
+               storage_classes=['hot'],
+               trash_at=datetime.datetime(2111, 1, 1, 11, 11, 11, 111111))
+        c.save(properties={'type' : 'Output'},
+               storage_classes=['cold'],
+               trash_at=datetime.datetime(2222, 2, 2, 22, 22, 22, 222222))
+
+        self.assertEqual(c.api_response()["storage_classes_desired"], ['cold'])
+        self.assertEqual(c.api_response()["properties"], {'type' : 'Output'})
+        self.assertEqual(c.api_response()["trash_at"], '2222-02-02T22:22:22.222222000Z')
 
     def test_create_diff_apply(self):
         c1 = self.create_count_txt()
index fa29dbd8135453587cee7a7fcfeb220f864d0755..25cb0037a253de47a7b6b55da10e7ad8e9c758ea 100644 (file)
@@ -21,11 +21,7 @@ class Arvados::V1::ContainersController < ApplicationController
   end
 
   def update
-    # container updates can trigger container request lookups, which
-    # can deadlock if we don't lock the container_requests table
-    # first.
-    @object.transaction do
-      ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
+    @object.with_lock do
       @object.reload
       super
     end
index 1dbdb571050a70ec3a684f18f335269ac35fd6f8..7ec9845bc1983c0819f4d801e5044d8e5765f00f 100644 (file)
@@ -5,6 +5,7 @@
 require 'log_reuse_info'
 require 'whitelist_update'
 require 'safe_json'
+require 'update_priority'
 
 class Container < ArvadosModel
   include ArvadosModelUpdates
@@ -37,6 +38,7 @@ class Container < ArvadosModel
   before_save :scrub_secret_mounts
   after_save :handle_completed
   after_save :propagate_priority
+  after_commit { UpdatePriority.run_update_thread }
 
   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
@@ -126,7 +128,6 @@ class Container < ArvadosModel
       # Update the priority of child container requests to match new
       # priority of the parent container (ignoring requests with no
       # container assigned, because their priority doesn't matter).
-      ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
       ContainerRequest.
         where(requesting_container_uuid: self.uuid,
               state: ContainerRequest::Committed).
@@ -316,10 +317,6 @@ class Container < ArvadosModel
     # (because state might have changed while acquiring the lock).
     check_lock_fail
     transaction do
-      # Locking involves assigning auth_uuid, which involves looking
-      # up container requests, so we must lock both tables in the
-      # proper order to avoid deadlock.
-      ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
       reload
       check_lock_fail
       update_attributes!(state: Locked)
@@ -542,7 +539,6 @@ class Container < ArvadosModel
     if self.state_changed? and self.final?
       act_as_system_user do
 
-        ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
         if self.state == Cancelled
           retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid)
         else
index ac4415bf2e618c0e8ef4c6fc6a06d9191cb91db4..42fd247f729cd294a7f8500caf24c177cf974060 100644 (file)
@@ -28,11 +28,12 @@ class ContainerRequest < ArvadosModel
 
   before_validation :fill_field_defaults, :if => :new_record?
   before_validation :validate_runtime_constraints
-  before_validation :validate_scheduling_parameters
   before_validation :set_container
+  before_validation :set_default_preemptable_scheduling_parameter
   validates :command, :container_image, :output_path, :cwd, :presence => true
   validates :output_ttl, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
   validates :priority, numericality: { only_integer: true, greater_than_or_equal_to: 0, less_than_or_equal_to: 1000 }
+  validate :validate_scheduling_parameters
   validate :validate_state_change
   validate :check_update_whitelist
   validate :secret_mounts_key_conflict
@@ -83,10 +84,10 @@ class ContainerRequest < ArvadosModel
     Committed => [Final]
   }
 
-  AttrsPermittedAlways = [:owner_uuid, :state, :name, :description]
+  AttrsPermittedAlways = [:owner_uuid, :state, :name, :description, :properties]
   AttrsPermittedBeforeCommit = [:command, :container_count_max,
   :container_image, :cwd, :environment, :filters, :mounts,
-  :output_path, :priority, :properties,
+  :output_path, :priority,
   :runtime_constraints, :state, :container_uuid, :use_existing,
   :scheduling_parameters, :secret_mounts, :output_name, :output_ttl]
 
@@ -197,6 +198,18 @@ class ContainerRequest < ArvadosModel
     end
   end
 
+  def set_default_preemptable_scheduling_parameter
+    if self.state == Committed
+      # If preemptable instances (eg: AWS Spot Instances) are allowed,
+      # ask them on child containers by default.
+      if Rails.configuration.preemptable_instances and
+        !self.requesting_container_uuid.nil? and
+        self.scheduling_parameters['preemptable'].nil?
+          self.scheduling_parameters['preemptable'] = true
+      end
+    end
+  end
+
   def validate_runtime_constraints
     case self.state
     when Committed
@@ -223,6 +236,9 @@ class ContainerRequest < ArvadosModel
             scheduling_parameters['partitions'].size)
             errors.add :scheduling_parameters, "partitions must be an array of strings"
       end
+      if !Rails.configuration.preemptable_instances and scheduling_parameters['preemptable']
+        errors.add :scheduling_parameters, "preemptable instances are not allowed"
+      end
     end
   end
 
@@ -286,7 +302,6 @@ class ContainerRequest < ArvadosModel
   def update_priority
     return unless state_changed? || priority_changed? || container_uuid_changed?
     act_as_system_user do
-      ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
       Container.
         where('uuid in (?)', [self.container_uuid_was, self.container_uuid].compact).
         map(&:update_priority!)
@@ -299,7 +314,6 @@ class ContainerRequest < ArvadosModel
 
   def set_requesting_container_uuid
     return if !current_api_client_authorization
-    ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
     if (c = Container.where('auth_uuid=?', current_api_client_authorization.uuid).select([:uuid, :priority]).first)
       self.requesting_container_uuid = c.uuid
       self.priority = c.priority>0 ? 1 : 0
index fe183678c155855a19e6acd2fc4cf8089cd6749e..7a7f0a3a600643cd43afe4d0eca3a2f66ef2a2b1 100644 (file)
@@ -12,6 +12,8 @@ class Group < ArvadosModel
   include CanBeAnOwner
   include Trashable
 
+  serialize :properties, Hash
+
   after_create :invalidate_permissions_cache
   after_update :maybe_invalidate_permissions_cache
   before_create :assign_name
@@ -24,6 +26,7 @@ class Group < ArvadosModel
     t.add :delete_at
     t.add :trash_at
     t.add :is_trashed
+    t.add :properties
   end
 
   def maybe_invalidate_permissions_cache
index a1c35f10fcf1f9e1aae9ead9bf1cda00b5f2535a..19b6f9b25058b3d418e37126220287d2511aabda 100644 (file)
@@ -289,6 +289,11 @@ common:
   ### Crunch, DNS & compute node management
   ###
 
+  # Preemptable instance support (e.g. AWS Spot Instances)
+  # When true, child containers will get created with the preemptable
+  # scheduling parameter parameter set.
+  preemptable_instances: false
+
   # Docker image to be used when none found in runtime_constraints of a job
   default_docker_image_for_jobs: false
 
index dfa08db1a9c82cf01e063f7963e8413bb2cccfd3..707c3dd946f377d7ada7513cf12a1ced9d896487 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 require 'migrate_yaml_to_json'
 
 class YamlToJson < ActiveRecord::Migration
index 003e5fb0929edfa490e4be82383f2ca7e0b75acb..921803a2970137c89cf8e19c8fdd329061f09c1c 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 require './db/migrate/20161213172944_full_text_search_indexes'
 
 class JsonCollectionProperties < ActiveRecord::Migration
index d90011c7c1db55ccf9d1c679245155c573fe0cca..aa42423a4dbd7ab9dc2d39ad13ca93e14dba485f 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 class AddIndexToContainers < ActiveRecord::Migration
   def up
     ActiveRecord::Base.connection.execute("CREATE INDEX index_containers_on_modified_at_uuid ON containers USING btree (modified_at desc, uuid asc)")
index b93dc54fcdb4eb4eba384fd57c5477c0a1a363d4..c9e50a64b79e56a64b1d90cfaee7be2c2dbf8900 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 class FixTrashFlagFollow < ActiveRecord::Migration
   def change
     ActiveRecord::Base.connection.execute("DROP MATERIALIZED VIEW materialized_permission_view")
index ce2403e743578f272f34cf360dfb544dc6f2132c..0183ef6dc51dc50d81df2999fa8bdd01539f8c14 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 class AddGinIndexToCollectionProperties < ActiveRecord::Migration
   def up
     ActiveRecord::Base.connection.execute("CREATE INDEX collection_index_on_properties ON collections USING gin (properties);")
index c56b7dcaf730cf715e24f87a717729a243249412..a161f633d8f9a7d5d9a2422650931bbb07c10fec 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 class AddSecretMountsToContainers < ActiveRecord::Migration
   def change
     add_column :container_requests, :secret_mounts, :jsonb, default: {}
index d577cbbb3eed43473484473d6edcd9f9a55e18b4..529126b299a701617ede80a1ba75fed1b05de280 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 class ChangeContainerPriorityBigint < ActiveRecord::Migration
   def change
     change_column :containers, :priority, :integer, limit: 8
index b2460ae18299f1cf2b444e1c7757d13b879b1e87..10b35a7aba03f5f5940e0309892567532819b1f2 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 class AddRedirectToUserUuidToUsers < ActiveRecord::Migration
   def up
     add_column :users, :redirect_to_user_uuid, :string
index 56cafea2c017083a6f86e0299983e7228cfcd372..79e777e0a61132808b56cb8eba1dc80e35e233fb 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 class AddContainerAuthUuidIndex < ActiveRecord::Migration
   def change
     add_index :containers, :auth_uuid
diff --git a/services/api/db/migrate/20180607175050_properties_to_jsonb.rb b/services/api/db/migrate/20180607175050_properties_to_jsonb.rb
new file mode 100644 (file)
index 0000000..988227a
--- /dev/null
@@ -0,0 +1,32 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require './db/migrate/20161213172944_full_text_search_indexes'
+
+class PropertiesToJsonb < ActiveRecord::Migration
+
+  @@tables_columns = [["nodes", "properties"],
+                      ["nodes", "info"],
+                      ["container_requests", "properties"],
+                      ["links", "properties"]]
+
+  def up
+    @@tables_columns.each do |table, column|
+      # Drop the FT index before changing column type to avoid
+      # "PG::DatatypeMismatch: ERROR: COALESCE types jsonb and text
+      # cannot be matched".
+      ActiveRecord::Base.connection.execute "DROP INDEX IF EXISTS #{table}_full_text_search_idx"
+      ActiveRecord::Base.connection.execute "ALTER TABLE #{table} ALTER COLUMN #{column} TYPE jsonb USING #{column}::jsonb"
+      ActiveRecord::Base.connection.execute "CREATE INDEX #{table}_index_on_#{column} ON #{table} USING gin (#{column})"
+    end
+    FullTextSearchIndexes.new.replace_index("container_requests")
+  end
+
+  def down
+    @@tables_columns.each do |table, column|
+      ActiveRecord::Base.connection.execute "DROP INDEX IF EXISTS #{table}_index_on_#{column}"
+      ActiveRecord::Base.connection.execute "ALTER TABLE #{table} ALTER COLUMN #{column} TYPE text"
+    end
+  end
+end
diff --git a/services/api/db/migrate/20180608123145_add_properties_to_groups.rb b/services/api/db/migrate/20180608123145_add_properties_to_groups.rb
new file mode 100644 (file)
index 0000000..12c6696
--- /dev/null
@@ -0,0 +1,18 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require './db/migrate/20161213172944_full_text_search_indexes'
+
+class AddPropertiesToGroups < ActiveRecord::Migration
+  def up
+    add_column :groups, :properties, :jsonb, default: {}
+    ActiveRecord::Base.connection.execute("CREATE INDEX group_index_on_properties ON groups USING gin (properties);")
+    FullTextSearchIndexes.new.replace_index('groups')
+  end
+
+  def down
+    ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS group_index_on_properties")
+    remove_column :groups, :properties
+  end
+end
index 0ab30c5e1cf0d2b7af810c0a264ae888a52ebeb5..a201a05aaf83a8efe52469f349e4c84fb75927f3 100644 (file)
@@ -277,7 +277,7 @@ CREATE TABLE container_requests (
     modified_by_user_uuid character varying(255),
     name character varying(255),
     description text,
-    properties text,
+    properties jsonb,
     state character varying(255),
     requesting_container_uuid character varying(255),
     container_uuid character varying(255),
@@ -396,7 +396,8 @@ CREATE TABLE groups (
     group_class character varying(255),
     trash_at timestamp without time zone,
     is_trashed boolean DEFAULT false NOT NULL,
-    delete_at timestamp without time zone
+    delete_at timestamp without time zone,
+    properties jsonb DEFAULT '{}'::jsonb
 );
 
 
@@ -682,7 +683,7 @@ CREATE TABLE links (
     link_class character varying(255),
     name character varying(255),
     head_uuid character varying(255),
-    properties text,
+    properties jsonb,
     updated_at timestamp without time zone NOT NULL
 );
 
@@ -853,9 +854,9 @@ CREATE TABLE nodes (
     ip_address character varying(255),
     first_ping_at timestamp without time zone,
     last_ping_at timestamp without time zone,
-    info text,
+    info jsonb,
     updated_at timestamp without time zone NOT NULL,
-    properties text,
+    properties jsonb,
     job_uuid character varying(255)
 );
 
@@ -1637,7 +1638,14 @@ CREATE INDEX collections_search_index ON collections USING btree (owner_uuid, mo
 -- Name: container_requests_full_text_search_idx; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX container_requests_full_text_search_idx ON container_requests USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)) || ' '::text) || COALESCE(scheduling_parameters, ''::text)) || ' '::text) || (COALESCE(output_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output_name, ''::character varying))::text)));
+CREATE INDEX container_requests_full_text_search_idx ON container_requests USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE((properties)::text, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)) || ' '::text) || COALESCE(scheduling_parameters, ''::text)) || ' '::text) || (COALESCE(output_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output_name, ''::character varying))::text)));
+
+
+--
+-- Name: container_requests_index_on_properties; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX container_requests_index_on_properties ON container_requests USING gin (properties);
 
 
 --
@@ -1654,11 +1662,18 @@ CREATE INDEX container_requests_search_index ON container_requests USING btree (
 CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, state, log, cwd, output_path, output, container_image, auth_uuid, locked_by_uuid);
 
 
+--
+-- Name: group_index_on_properties; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX group_index_on_properties ON groups USING gin (properties);
+
+
 --
 -- Name: groups_full_text_search_idx; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX groups_full_text_search_idx ON groups USING gin (to_tsvector('english'::regconfig, (((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(group_class, ''::character varying))::text)));
+CREATE INDEX groups_full_text_search_idx ON groups USING gin (to_tsvector('english'::regconfig, (((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(group_class, ''::character varying))::text) || ' '::text) || COALESCE((properties)::text, ''::text))));
 
 
 --
@@ -2613,6 +2628,13 @@ CREATE INDEX keep_disks_search_index ON keep_disks USING btree (uuid, owner_uuid
 CREATE INDEX keep_services_search_index ON keep_services USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, service_host, service_type);
 
 
+--
+-- Name: links_index_on_properties; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX links_index_on_properties ON links USING gin (properties);
+
+
 --
 -- Name: links_search_index; Type: INDEX; Schema: public; Owner: -
 --
@@ -2634,6 +2656,20 @@ CREATE UNIQUE INDEX links_tail_name_unique_if_link_class_name ON links USING btr
 CREATE INDEX logs_search_index ON logs USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, object_uuid, event_type, object_owner_uuid);
 
 
+--
+-- Name: nodes_index_on_info; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX nodes_index_on_info ON nodes USING gin (info);
+
+
+--
+-- Name: nodes_index_on_properties; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX nodes_index_on_properties ON nodes USING gin (properties);
+
+
 --
 -- Name: nodes_search_index; Type: INDEX; Schema: public; Owner: -
 --
@@ -3080,3 +3116,6 @@ INSERT INTO schema_migrations (version) VALUES ('20180501182859');
 
 INSERT INTO schema_migrations (version) VALUES ('20180514135529');
 
+INSERT INTO schema_migrations (version) VALUES ('20180608123145');
+
+INSERT INTO schema_migrations (version) VALUES ('20180607175050');
diff --git a/services/api/lib/update_priority.rb b/services/api/lib/update_priority.rb
new file mode 100644 (file)
index 0000000..724d2b2
--- /dev/null
@@ -0,0 +1,38 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+module UpdatePriority
+  # Clean up after races: if container priority>0 but there are no
+  # committed container requests for it, reset priority to 0.
+  def self.update_priority
+    if !File.owned?(Rails.root.join('tmp'))
+      Rails.logger.warn("UpdatePriority: not owner of #{Rails.root}/tmp, skipping")
+      return
+    end
+    lockfile = Rails.root.join('tmp', 'update_priority.lock')
+    File.open(lockfile, File::RDWR|File::CREAT, 0600) do |f|
+      return unless f.flock(File::LOCK_NB|File::LOCK_EX)
+      ActiveRecord::Base.connection.execute("UPDATE containers AS c SET priority=0 WHERE state='Queued' AND priority>0 AND uuid NOT IN (SELECT container_uuid FROM container_requests WHERE priority>0);")
+    end
+  end
+
+  def self.run_update_thread
+    need = false
+    Rails.cache.fetch('UpdatePriority', expires_in: 5.seconds) do
+      need = true
+    end
+    return if !need
+
+    Thread.new do
+      Thread.current.abort_on_exception = false
+      begin
+        update_priority
+      rescue => e
+        Rails.logger.error "#{e.class}: #{e}\n#{e.backtrace.join("\n\t")}"
+      ensure
+        ActiveRecord::Base.connection.close
+      end
+    end
+  end
+end
index 923083832c658627f02b6001b8f71ccfd47f6a59..d07027721f603565d3d6c66838fdd5ad666b95da 100644 (file)
@@ -99,7 +99,7 @@ class ArvadosModelTest < ActiveSupport::TestCase
                         properties: {'foo' => 'bar'}.with_indifferent_access)
     raw = ActiveRecord::Base.connection.
       select_value("select properties from links where uuid='#{link.uuid}'")
-    assert_equal '{"foo":"bar"}', raw
+    assert_equal '{"foo": "bar"}', raw
   end
 
   test "store long string" do
index 3483b874c6c71cd4db6185df6e600eca1c4169f0..b36ff06bbd551f3b2d0fe83468ad1db135ecd5b1 100644 (file)
@@ -757,6 +757,98 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal ContainerRequest::Final, cr3.state
   end
 
+  [
+    [false, ActiveRecord::RecordInvalid],
+    [true, nil],
+  ].each do |preemptable_conf, expected|
+    test "having Rails.configuration.preemptable_instances=#{preemptable_conf}, create preemptable container request and verify #{expected}" do
+      sp = {"preemptable" => true}
+      common_attrs = {cwd: "test",
+                      priority: 1,
+                      command: ["echo", "hello"],
+                      output_path: "test",
+                      scheduling_parameters: sp,
+                      mounts: {"test" => {"kind" => "json"}}}
+      Rails.configuration.preemptable_instances = preemptable_conf
+      set_user_from_auth :active
+
+      cr = create_minimal_req!(common_attrs)
+      cr.state = ContainerRequest::Committed
+
+      if !expected.nil?
+        assert_raises(expected) do
+          cr.save!
+        end
+      else
+        cr.save!
+        assert_equal sp, cr.scheduling_parameters
+      end
+    end
+  end
+
+  [
+    'zzzzz-dz642-runningcontainr',
+    nil,
+  ].each do |requesting_c|
+    test "having preemptable instances active on the API server, a committed #{requesting_c.nil? ? 'non-':''}child CR should not ask for preemptable instance if parameter already set to false" do
+      common_attrs = {cwd: "test",
+                      priority: 1,
+                      command: ["echo", "hello"],
+                      output_path: "test",
+                      scheduling_parameters: {"preemptable" => false},
+                      mounts: {"test" => {"kind" => "json"}}}
+
+      Rails.configuration.preemptable_instances = true
+      set_user_from_auth :active
+
+      if requesting_c
+        cr = with_container_auth(Container.find_by_uuid requesting_c) do
+          create_minimal_req!(common_attrs)
+        end
+        assert_not_nil cr.requesting_container_uuid
+      else
+        cr = create_minimal_req!(common_attrs)
+      end
+
+      cr.state = ContainerRequest::Committed
+      cr.save!
+
+      assert_equal false, cr.scheduling_parameters['preemptable']
+    end
+  end
+
+  [
+    [true, 'zzzzz-dz642-runningcontainr', true],
+    [true, nil, nil],
+    [false, 'zzzzz-dz642-runningcontainr', nil],
+    [false, nil, nil],
+  ].each do |preemptable_conf, requesting_c, schedule_preemptable|
+    test "having Rails.configuration.preemptable_instances=#{preemptable_conf}, #{requesting_c.nil? ? 'non-':''}child CR should #{schedule_preemptable ? '':'not'} ask for preemptable instance by default" do
+      common_attrs = {cwd: "test",
+                      priority: 1,
+                      command: ["echo", "hello"],
+                      output_path: "test",
+                      mounts: {"test" => {"kind" => "json"}}}
+
+      Rails.configuration.preemptable_instances = preemptable_conf
+      set_user_from_auth :active
+
+      if requesting_c
+        cr = with_container_auth(Container.find_by_uuid requesting_c) do
+          create_minimal_req!(common_attrs)
+        end
+        assert_not_nil cr.requesting_container_uuid
+      else
+        cr = create_minimal_req!(common_attrs)
+      end
+
+      cr.state = ContainerRequest::Committed
+      cr.save!
+
+      assert_equal schedule_preemptable, cr.scheduling_parameters['preemptable']
+    end
+  end
+
   [
     [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
     [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Uncommitted],
index 9e3baab95080d8e578792b8cf4ab1beff15d4425..d1f19dd7b5702e2431471bc7ce2164f553a8cb11 100644 (file)
@@ -7,6 +7,7 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
+       "bytes"
        "context"
        "flag"
        "fmt"
@@ -274,8 +275,21 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                log.Printf("Submitting container %s to slurm", ctr.UUID)
                if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil {
                        var text string
-                       if err == dispatchcloud.ErrConstraintsNotSatisfiable {
-                               text = fmt.Sprintf("cannot run container %s: %s", ctr.UUID, err)
+                       if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+                               var logBuf bytes.Buffer
+                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
+                               if len(err.AvailableTypes) == 0 {
+                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
+                               } else {
+                                       fmt.Fprint(&logBuf, "Available instance types:\n")
+                                       for _, t := range err.AvailableTypes {
+                                               fmt.Fprintf(&logBuf,
+                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
+                                               )
+                                       }
+                               }
+                               text = logBuf.String()
                                disp.UpdateState(ctr.UUID, dispatch.Cancelled)
                        } else {
                                text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
index 85617cf1154c2f1e32c6cf5edd6f20dd1538762a..b4033e78b00abee87e2fb7423281021be5233577 100644 (file)
@@ -391,16 +391,18 @@ func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
                        types: []arvados.InstanceType{
                                {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
                        },
-                       err: dispatchcloud.ErrConstraintsNotSatisfiable,
+                       err: dispatchcloud.ConstraintsNotSatisfiableError{},
                },
        } {
                c.Logf("%#v", trial)
                s.disp.cluster = &arvados.Cluster{InstanceTypes: trial.types}
 
                args, err := s.disp.sbatchArgs(container)
-               c.Check(err, Equals, trial.err)
+               c.Check(err == nil, Equals, trial.err == nil)
                if trial.err == nil {
                        c.Check(args, DeepEquals, append([]string{"--job-name=123", "--nice=10000"}, trial.sbatchArgs...))
+               } else {
+                       c.Check(len(err.(dispatchcloud.ConstraintsNotSatisfiableError).AvailableTypes), Equals, len(trial.types))
                }
        }
 }
index 2ecc8726f5e54d91518b3f20c08eb8f1dec41852..c312a532e44f43d63fa65b1d6ff6e7af9028a924 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
index d7c9082a48d32d380713fa548255b83d603f907d..5a1aa809146db0f4b5a89e32390877963302e9e6 100644 (file)
@@ -42,7 +42,7 @@ setup(name='arvados_fuse',
         # llfuse 1.3.4 fails to install via pip
         'llfuse >=1.2, <1.3.4',
         'python-daemon',
-        'ciso8601 >=1.0.6',
+        'ciso8601 >=1.0.6, <2.0.0',
         'setuptools'
         ],
       test_suite='tests',
index 496fb884d433a5eea350d27818d5e72629e9242f..376d4830b153b85cd82df1220902059b1aa2e4ac 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
index 7b25d78852bde9a575242331897bc26bc8cb092b..d86234a936cc96702f3a79d12c10d04548c0faa2 100644 (file)
@@ -95,6 +95,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                        return
                }
        }
+       bal.cleanupMounts()
 
        if err = bal.CheckSanityEarly(&config.Client); err != nil {
                return
@@ -169,6 +170,38 @@ func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) e
        })
 }
 
+func (bal *Balancer) cleanupMounts() {
+       rwdev := map[string]*KeepService{}
+       for _, srv := range bal.KeepServices {
+               for _, mnt := range srv.mounts {
+                       if !mnt.ReadOnly && mnt.DeviceID != "" {
+                               rwdev[mnt.DeviceID] = srv
+                       }
+               }
+       }
+       // Drop the readonly mounts whose device is mounted RW
+       // elsewhere.
+       for _, srv := range bal.KeepServices {
+               var dedup []*KeepMount
+               for _, mnt := range srv.mounts {
+                       if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
+                               bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
+                       } else {
+                               dedup = append(dedup, mnt)
+                       }
+               }
+               srv.mounts = dedup
+       }
+       for _, srv := range bal.KeepServices {
+               for _, mnt := range srv.mounts {
+                       if mnt.Replication <= 0 {
+                               log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
+                               mnt.Replication = 1
+                       }
+               }
+       }
+}
+
 // CheckSanityEarly checks for configuration and runtime errors that
 // can be detected before GetCurrentState() and ComputeChangeSets()
 // are called.
@@ -249,32 +282,54 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
        errs := make(chan error, 2+len(bal.KeepServices))
        wg := sync.WaitGroup{}
 
-       // Start one goroutine for each KeepService: retrieve the
-       // index, and add the returned blocks to BlockStateMap.
+       // When a device is mounted more than once, we will get its
+       // index only once, and call AddReplicas on all of the mounts.
+       // equivMount keys are the mounts that will be indexed, and
+       // each value is a list of mounts to apply the received index
+       // to.
+       equivMount := map[*KeepMount][]*KeepMount{}
+       // deviceMount maps each device ID to the one mount that will
+       // be indexed for that device.
+       deviceMount := map[string]*KeepMount{}
        for _, srv := range bal.KeepServices {
+               for _, mnt := range srv.mounts {
+                       equiv := deviceMount[mnt.DeviceID]
+                       if equiv == nil {
+                               equiv = mnt
+                               if mnt.DeviceID != "" {
+                                       deviceMount[mnt.DeviceID] = equiv
+                               }
+                       }
+                       equivMount[equiv] = append(equivMount[equiv], mnt)
+               }
+       }
+
+       // Start one goroutine for each (non-redundant) mount:
+       // retrieve the index, and add the returned blocks to
+       // BlockStateMap.
+       for _, mounts := range equivMount {
                wg.Add(1)
-               go func(srv *KeepService) {
+               go func(mounts []*KeepMount) {
                        defer wg.Done()
-                       bal.logf("%s: retrieve indexes", srv)
-                       for _, mount := range srv.mounts {
-                               bal.logf("%s: retrieve index", mount)
-                               idx, err := srv.IndexMount(c, mount.UUID, "")
-                               if err != nil {
-                                       errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
-                                       return
-                               }
-                               if len(errs) > 0 {
-                                       // Some other goroutine encountered an
-                                       // error -- any further effort here
-                                       // will be wasted.
-                                       return
-                               }
+                       bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
+                       idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+                       if err != nil {
+                               errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err)
+                               return
+                       }
+                       if len(errs) > 0 {
+                               // Some other goroutine encountered an
+                               // error -- any further effort here
+                               // will be wasted.
+                               return
+                       }
+                       for _, mount := range mounts {
                                bal.logf("%s: add %d replicas to map", mount, len(idx))
                                bal.BlockStateMap.AddReplicas(mount, idx)
-                               bal.logf("%s: done", mount)
+                               bal.logf("%s: added %d replicas", mount, len(idx))
                        }
-                       bal.logf("%s: done", srv)
-               }(srv)
+                       bal.logf("mount %s: index done", mounts[0])
+               }(mounts)
        }
 
        // collQ buffers incoming collections so we can start fetching
@@ -365,27 +420,29 @@ func (bal *Balancer) ComputeChangeSets() {
                blkid arvados.SizedDigest
                blk   *BlockState
        }
-       nWorkers := 1 + runtime.NumCPU()
-       todo := make(chan balanceTask, nWorkers)
-       results := make(chan balanceResult, 16)
-       var wg sync.WaitGroup
-       for i := 0; i < nWorkers; i++ {
-               wg.Add(1)
-               go func() {
-                       for work := range todo {
-                               results <- bal.balanceBlock(work.blkid, work.blk)
+       workers := runtime.GOMAXPROCS(-1)
+       todo := make(chan balanceTask, workers)
+       go func() {
+               bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+                       todo <- balanceTask{
+                               blkid: blkid,
+                               blk:   blk,
                        }
-                       wg.Done()
-               }()
-       }
-       bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
-               todo <- balanceTask{
-                       blkid: blkid,
-                       blk:   blk,
-               }
-       })
-       close(todo)
+               })
+               close(todo)
+       }()
+       results := make(chan balanceResult, workers)
        go func() {
+               var wg sync.WaitGroup
+               for i := 0; i < workers; i++ {
+                       wg.Add(1)
+                       go func() {
+                               for work := range todo {
+                                       results <- bal.balanceBlock(work.blkid, work.blk)
+                               }
+                               wg.Done()
+                       }()
+               }
                wg.Wait()
                close(results)
        }()
@@ -501,10 +558,14 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
        for _, class := range bal.classes {
                desired := blk.Desired[class]
 
+               countedDev := map[string]bool{}
                have := 0
                for _, slot := range slots {
-                       if slot.repl != nil && bal.mountsByClass[class][slot.mnt] {
-                               have++
+                       if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
+                               have += slot.mnt.Replication
+                               if slot.mnt.DeviceID != "" {
+                                       countedDev[slot.mnt.DeviceID] = true
+                               }
                        }
                }
                classState[class] = balancedBlockState{
@@ -549,32 +610,48 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                        }
                })
 
-               // Servers and mounts (with or without existing
+               // Servers/mounts/devices (with or without existing
                // replicas) that are part of the best achievable
                // layout for this storage class.
                wantSrv := map[*KeepService]bool{}
                wantMnt := map[*KeepMount]bool{}
+               wantDev := map[string]bool{}
                // Positions (with existing replicas) that have been
                // protected (via unsafeToDelete) to ensure we don't
                // reduce replication below desired level when
                // trashing replicas that aren't optimal positions for
                // any storage class.
                protMnt := map[*KeepMount]bool{}
+               // Replication planned so far (corresponds to wantMnt).
+               replWant := 0
+               // Protected replication (corresponds to protMnt).
+               replProt := 0
 
                // trySlot tries using a slot to meet requirements,
                // and returns true if all requirements are met.
                trySlot := func(i int) bool {
                        slot := slots[i]
-                       if len(protMnt) < desired && slot.repl != nil {
+                       if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
+                               // Already allocated a replica to this
+                               // backend device, possibly on a
+                               // different server.
+                               return false
+                       }
+                       if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
                                unsafeToDelete[slot.repl.Mtime] = true
                                protMnt[slot.mnt] = true
+                               replProt += slot.mnt.Replication
                        }
-                       if len(wantMnt) < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
+                       if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
                                slots[i].want = true
                                wantSrv[slot.mnt.KeepService] = true
                                wantMnt[slot.mnt] = true
+                               if slot.mnt.DeviceID != "" {
+                                       wantDev[slot.mnt.DeviceID] = true
+                               }
+                               replWant += slot.mnt.Replication
                        }
-                       return len(protMnt) >= desired && len(wantMnt) >= desired
+                       return replProt >= desired && replWant >= desired
                }
 
                // First try to achieve desired replication without
@@ -601,7 +678,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                                if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
                                        continue
                                }
-                               if safe++; safe >= desired {
+                               if safe += slot.mnt.Replication; safe >= desired {
                                        break
                                }
                        }
@@ -617,19 +694,36 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                        cs.unachievable = true
                        classState[class] = cs
                }
+
+               // Avoid deleting wanted replicas from devices that
+               // are mounted on multiple servers -- even if they
+               // haven't already been added to unsafeToDelete
+               // because the servers report different Mtimes.
+               for _, slot := range slots {
+                       if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
+                               unsafeToDelete[slot.repl.Mtime] = true
+                       }
+               }
        }
 
        // TODO: If multiple replicas are trashable, prefer the oldest
        // replica that doesn't have a timestamp collision with
        // others.
 
+       countedDev := map[string]bool{}
        var have, want int
        for _, slot := range slots {
+               if countedDev[slot.mnt.DeviceID] {
+                       continue
+               }
                if slot.want {
-                       want++
+                       want += slot.mnt.Replication
                }
                if slot.repl != nil {
-                       have++
+                       have += slot.mnt.Replication
+               }
+               if slot.mnt.DeviceID != "" {
+                       countedDev[slot.mnt.DeviceID] = true
                }
        }
 
@@ -771,7 +865,7 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
                case surplus > 0:
                        s.overrep.replicas += surplus
                        s.overrep.blocks++
-                       s.overrep.bytes += bytes * int64(len(result.blk.Replicas)-result.want)
+                       s.overrep.bytes += bytes * int64(result.have-result.want)
                default:
                        s.justright.replicas += result.want
                        s.justright.blocks++
@@ -783,16 +877,16 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
                        s.desired.blocks++
                        s.desired.bytes += bytes * int64(result.want)
                }
-               if len(result.blk.Replicas) > 0 {
-                       s.current.replicas += len(result.blk.Replicas)
+               if result.have > 0 {
+                       s.current.replicas += result.have
                        s.current.blocks++
-                       s.current.bytes += bytes * int64(len(result.blk.Replicas))
+                       s.current.bytes += bytes * int64(result.have)
                }
 
-               for len(s.replHistogram) <= len(result.blk.Replicas) {
+               for len(s.replHistogram) <= result.have {
                        s.replHistogram = append(s.replHistogram, 0)
                }
-               s.replHistogram[len(result.blk.Replicas)]++
+               s.replHistogram[result.have]++
        }
        for _, srv := range bal.KeepServices {
                s.pulls += len(srv.ChangeSet.Pulls)
index cfdd47fc9126db5b4455b7d8b747f3fcb51e766c..2e664bedfb19fe8054d39083e6ee4f5cf6e477c6 100644 (file)
@@ -49,6 +49,8 @@ type tester struct {
 
        shouldPullMounts  []string
        shouldTrashMounts []string
+
+       expectResult balanceResult
 }
 
 func (bal *balancerSuite) SetUpSuite(c *check.C) {
@@ -90,6 +92,7 @@ func (bal *balancerSuite) SetUpTest(c *check.C) {
        }
 
        bal.MinMtime = time.Now().UnixNano() - bal.signatureTTL*1e9
+       bal.cleanupMounts()
 }
 
 func (bal *balancerSuite) TestPerfect(c *check.C) {
@@ -245,6 +248,198 @@ func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
                shouldTrash: slots{2}})
 }
 
+func (bal *balancerSuite) TestCleanupMounts(c *check.C) {
+       bal.srvs[3].mounts[0].KeepMount.ReadOnly = true
+       bal.srvs[3].mounts[0].KeepMount.DeviceID = "abcdef"
+       bal.srvs[14].mounts[0].KeepMount.DeviceID = "abcdef"
+       c.Check(len(bal.srvs[3].mounts), check.Equals, 1)
+       bal.cleanupMounts()
+       c.Check(len(bal.srvs[3].mounts), check.Equals, 0)
+       bal.try(c, tester{
+               known:      0,
+               desired:    map[string]int{"default": 2},
+               current:    slots{1},
+               shouldPull: slots{2}})
+}
+
+func (bal *balancerSuite) TestVolumeReplication(c *check.C) {
+       bal.srvs[0].mounts[0].KeepMount.Replication = 2  // srv 0
+       bal.srvs[14].mounts[0].KeepMount.Replication = 2 // srv e
+       bal.cleanupMounts()
+       // block 0 rendezvous is 3,e,a -- so slot 1 has repl=2
+       bal.try(c, tester{
+               known:      0,
+               desired:    map[string]int{"default": 2},
+               current:    slots{1},
+               shouldPull: slots{0}})
+       bal.try(c, tester{
+               known:      0,
+               desired:    map[string]int{"default": 2},
+               current:    slots{0, 1},
+               shouldPull: nil})
+       bal.try(c, tester{
+               known:       0,
+               desired:     map[string]int{"default": 2},
+               current:     slots{0, 1, 2},
+               shouldTrash: slots{2}})
+       bal.try(c, tester{
+               known:       0,
+               desired:     map[string]int{"default": 3},
+               current:     slots{0, 2, 3, 4},
+               shouldPull:  slots{1},
+               shouldTrash: slots{4},
+               expectResult: balanceResult{
+                       have: 4,
+                       want: 3,
+                       classState: map[string]balancedBlockState{"default": {
+                               desired:      3,
+                               surplus:      1,
+                               unachievable: false}}}})
+       bal.try(c, tester{
+               known:       0,
+               desired:     map[string]int{"default": 3},
+               current:     slots{0, 1, 2, 3, 4},
+               shouldTrash: slots{2, 3, 4}})
+       bal.try(c, tester{
+               known:       0,
+               desired:     map[string]int{"default": 4},
+               current:     slots{0, 1, 2, 3, 4},
+               shouldTrash: slots{3, 4},
+               expectResult: balanceResult{
+                       have: 6,
+                       want: 4,
+                       classState: map[string]balancedBlockState{"default": {
+                               desired:      4,
+                               surplus:      2,
+                               unachievable: false}}}})
+       // block 1 rendezvous is 0,9,7 -- so slot 0 has repl=2
+       bal.try(c, tester{
+               known:   1,
+               desired: map[string]int{"default": 2},
+               current: slots{0},
+               expectResult: balanceResult{
+                       have: 2,
+                       want: 2,
+                       classState: map[string]balancedBlockState{"default": {
+                               desired:      2,
+                               surplus:      0,
+                               unachievable: false}}}})
+       bal.try(c, tester{
+               known:      1,
+               desired:    map[string]int{"default": 3},
+               current:    slots{0},
+               shouldPull: slots{1}})
+       bal.try(c, tester{
+               known:      1,
+               desired:    map[string]int{"default": 4},
+               current:    slots{0},
+               shouldPull: slots{1, 2}})
+       bal.try(c, tester{
+               known:      1,
+               desired:    map[string]int{"default": 4},
+               current:    slots{2},
+               shouldPull: slots{0, 1}})
+       bal.try(c, tester{
+               known:      1,
+               desired:    map[string]int{"default": 4},
+               current:    slots{7},
+               shouldPull: slots{0, 1, 2},
+               expectResult: balanceResult{
+                       have: 1,
+                       want: 4,
+                       classState: map[string]balancedBlockState{"default": {
+                               desired:      4,
+                               surplus:      -3,
+                               unachievable: false}}}})
+       bal.try(c, tester{
+               known:       1,
+               desired:     map[string]int{"default": 2},
+               current:     slots{1, 2, 3, 4},
+               shouldPull:  slots{0},
+               shouldTrash: slots{3, 4}})
+       bal.try(c, tester{
+               known:       1,
+               desired:     map[string]int{"default": 2},
+               current:     slots{0, 1, 2},
+               shouldTrash: slots{1, 2},
+               expectResult: balanceResult{
+                       have: 4,
+                       want: 2,
+                       classState: map[string]balancedBlockState{"default": {
+                               desired:      2,
+                               surplus:      2,
+                               unachievable: false}}}})
+}
+
+func (bal *balancerSuite) TestDeviceRWMountedByMultipleServers(c *check.C) {
+       bal.srvs[0].mounts[0].KeepMount.DeviceID = "abcdef"
+       bal.srvs[9].mounts[0].KeepMount.DeviceID = "abcdef"
+       bal.srvs[14].mounts[0].KeepMount.DeviceID = "abcdef"
+       // block 0 belongs on servers 3 and e, which have different
+       // device IDs.
+       bal.try(c, tester{
+               known:      0,
+               desired:    map[string]int{"default": 2},
+               current:    slots{1},
+               shouldPull: slots{0}})
+       // block 1 belongs on servers 0 and 9, which both report
+       // having a replica, but the replicas are on the same device
+       // ID -- so we should pull to the third position (7).
+       bal.try(c, tester{
+               known:      1,
+               desired:    map[string]int{"default": 2},
+               current:    slots{0, 1},
+               shouldPull: slots{2}})
+       // block 1 can be pulled to the doubly-mounted device, but the
+       // pull should only be done on the first of the two servers.
+       bal.try(c, tester{
+               known:      1,
+               desired:    map[string]int{"default": 2},
+               current:    slots{2},
+               shouldPull: slots{0}})
+       // block 0 has one replica on a single device mounted on two
+       // servers (e,9 at positions 1,9). Trashing the replica on 9
+       // would lose the block.
+       bal.try(c, tester{
+               known:      0,
+               desired:    map[string]int{"default": 2},
+               current:    slots{1, 9},
+               shouldPull: slots{0},
+               expectResult: balanceResult{
+                       have: 1,
+                       classState: map[string]balancedBlockState{"default": {
+                               desired:      2,
+                               surplus:      -1,
+                               unachievable: false}}}})
+       // block 0 is overreplicated, but the second and third
+       // replicas are the same replica according to DeviceID
+       // (despite different Mtimes). Don't trash the third replica.
+       bal.try(c, tester{
+               known:   0,
+               desired: map[string]int{"default": 2},
+               current: slots{0, 1, 9},
+               expectResult: balanceResult{
+                       have: 2,
+                       classState: map[string]balancedBlockState{"default": {
+                               desired:      2,
+                               surplus:      0,
+                               unachievable: false}}}})
+       // block 0 is overreplicated; the third and fifth replicas are
+       // extra, but the fourth is another view of the second and
+       // shouldn't be trashed.
+       bal.try(c, tester{
+               known:       0,
+               desired:     map[string]int{"default": 2},
+               current:     slots{0, 1, 5, 9, 12},
+               shouldTrash: slots{5, 12},
+               expectResult: balanceResult{
+                       have: 4,
+                       classState: map[string]balancedBlockState{"default": {
+                               desired:      2,
+                               surplus:      2,
+                               unachievable: false}}}})
+}
+
 func (bal *balancerSuite) TestChangeStorageClasses(c *check.C) {
        // For known blocks 0/1/2/3, server 9 is slot 9/1/14/0 in
        // probe order. For these tests we give it two mounts, one
@@ -373,7 +568,7 @@ func (bal *balancerSuite) try(c *check.C, t tester) {
        for _, srv := range bal.srvs {
                srv.ChangeSet = &ChangeSet{}
        }
-       bal.balanceBlock(knownBlkid(t.known), blk)
+       result := bal.balanceBlock(knownBlkid(t.known), blk)
 
        var didPull, didTrash slots
        var didPullMounts, didTrashMounts []string
@@ -409,6 +604,15 @@ func (bal *balancerSuite) try(c *check.C, t tester) {
                sort.Strings(didTrashMounts)
                c.Check(didTrashMounts, check.DeepEquals, t.shouldTrashMounts)
        }
+       if t.expectResult.have > 0 {
+               c.Check(result.have, check.Equals, t.expectResult.have)
+       }
+       if t.expectResult.want > 0 {
+               c.Check(result.want, check.Equals, t.expectResult.want)
+       }
+       if t.expectResult.classState != nil {
+               c.Check(result.classState, check.DeepEquals, t.expectResult.classState)
+       }
 }
 
 // srvList returns the KeepServices, sorted in rendezvous order and
index 947033564df01e479d05682617fc041417e5d54f..90235cbf3188d91bc274412ddd5522dc639fa812 100644 (file)
@@ -9,6 +9,7 @@ import (
        "flag"
        "fmt"
        "log"
+       "net/http"
        "os"
        "os/signal"
        "syscall"
@@ -45,6 +46,9 @@ type Config struct {
        // more memory, but can reduce store-and-forward latency when
        // fetching pages)
        CollectionBuffers int
+
+       // Timeout for outgoing http request/response cycle.
+       RequestTimeout arvados.Duration
 }
 
 // RunOptions controls runtime behavior. The flags/options that belong
@@ -107,6 +111,14 @@ func main() {
                log.Fatal(config.DumpAndExit(cfg))
        }
 
+       to := time.Duration(cfg.RequestTimeout)
+       if to == 0 {
+               to = 30 * time.Minute
+       }
+       arvados.DefaultSecureClient.Timeout = to
+       arvados.InsecureHTTPClient.Timeout = to
+       http.DefaultClient.Timeout = to
+
        log.Printf("keep-balance %s started", version)
 
        if *debugFlag {
index 0f4effe6f4e9b7c4e2590cfeb48ef5cd729ec5cd..4c7d5067182fe89783e104c56063fdaf86545c1b 100644 (file)
@@ -19,7 +19,8 @@ KeepServiceTypes:
     - disk
 RunPeriod: 600s
 CollectionBatchSize: 100000
-CollectionBuffers: 1000`)
+CollectionBuffers: 1000
+RequestTimeout: 30m`)
 
 func usage() {
        fmt.Fprintf(os.Stderr, `
@@ -86,6 +87,11 @@ Tuning resource usage:
     while the current page is still being processed. If this is zero
     or omitted, pages are processed serially.
 
+    RequestTimeout is the maximum time keep-balance will spend on a
+    single HTTP request (getting a page of collections, getting the
+    block index from a keepstore server, or sending a trash or pull
+    list to a keepstore server). Defaults to 30 minutes.
+
 Limitations:
 
     keep-balance does not attempt to discover whether committed pull
index 52db776a4319bf34846c23dfed0a8f8b63fe757b..473171e1f5c41c0dd371844e4432c46d588e29ab 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import "golang.org/x/net/webdav"
index 16177064928c2509f1d5ff8c227cff2411a4c821..07fc63b63f8c34b9b5a3d6c49dacdc1d44f36f88 100644 (file)
@@ -487,6 +487,15 @@ func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) {
 
        locatorIn := mux.Vars(req)["locator"]
 
+       // Check if the client specified storage classes
+       if req.Header.Get("X-Keep-Storage-Classes") != "" {
+               var scl []string
+               for _, sc := range strings.Split(req.Header.Get("X-Keep-Storage-Classes"), ",") {
+                       scl = append(scl, strings.Trim(sc, " "))
+               }
+               kc.StorageClasses = scl
+       }
+
        _, err = fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
        if err != nil || expectLength < 0 {
                err = LengthRequiredError
index 65e22e3b3ed3d761530089a33d2cd712dc9550b4..e87fa4afd0db660c16af8a7ec78e68027620c531 100644 (file)
@@ -162,6 +162,33 @@ func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
        c.Check(err, ErrorMatches, `.*loop detected.*`)
 }
 
+func (s *ServerRequiredSuite) TestStorageClassesHeader(c *C) {
+       kc := runProxy(c, nil, false)
+       defer closeListener()
+
+       // Set up fake keepstore to record request headers
+       var hdr http.Header
+       ts := httptest.NewServer(http.HandlerFunc(
+               func(w http.ResponseWriter, r *http.Request) {
+                       hdr = r.Header
+                       http.Error(w, "Error", http.StatusInternalServerError)
+               }))
+       defer ts.Close()
+
+       // Point keepproxy router's keepclient to the fake keepstore
+       sr := map[string]string{
+               TestProxyUUID: ts.URL,
+       }
+       router.(*proxyHandler).KeepClient.SetServiceRoots(sr, sr, sr)
+
+       // Set up client to ask for storage classes to keepproxy
+       kc.StorageClasses = []string{"secure"}
+       content := []byte("Very important data")
+       _, _, err := kc.PutB(content)
+       c.Check(err, NotNil)
+       c.Check(hdr.Get("X-Keep-Storage-Classes"), Equals, "secure")
+}
+
 func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
        kc := runProxy(c, nil, false)
        defer closeListener()
@@ -587,30 +614,29 @@ func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
 }
 
 func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, Equals, nil)
+       kc := runProxy(c, nil, false)
+       defer closeListener()
 
-       // keepclient with no such keep server
-       kc := keepclient.New(arv)
+       // Point keepproxy to a non-existant keepstore
        locals := map[string]string{
                TestProxyUUID: "http://localhost:12345",
        }
-       kc.SetServiceRoots(locals, nil, nil)
+       router.(*proxyHandler).KeepClient.SetServiceRoots(locals, nil, nil)
 
-       // Ask should result in temporary connection refused error
+       // Ask should result in temporary bad gateway error
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
-       _, _, err = kc.Ask(hash)
+       _, _, err := kc.Ask(hash)
        c.Check(err, NotNil)
        errNotFound, _ := err.(*keepclient.ErrNotFound)
        c.Check(errNotFound.Temporary(), Equals, true)
-       c.Assert(err, ErrorMatches, ".*connection refused.*")
+       c.Assert(err, ErrorMatches, ".*HTTP 502.*")
 
-       // Get should result in temporary connection refused error
+       // Get should result in temporary bad gateway error
        _, _, _, err = kc.Get(hash)
        c.Check(err, NotNil)
        errNotFound, _ = err.(*keepclient.ErrNotFound)
        c.Check(errNotFound.Temporary(), Equals, true)
-       c.Assert(err, ErrorMatches, ".*connection refused.*")
+       c.Assert(err, ErrorMatches, ".*HTTP 502.*")
 }
 
 func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
index 9d4d8019282ebf01160544d940345b36fe892076..bdab58927bdc243605b8cf1d7e95b34d2f610272 100644 (file)
@@ -429,7 +429,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        case <-ctx.Done():
                theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
                // Our pipe might be stuck in Write(), waiting for
-               // io.Copy() to read. If so, un-stick it. This means
+               // PutReader() to read. If so, un-stick it. This means
                // PutReader will get corrupt data, but that's OK: the
                // size and MD5 won't match, so the write will fail.
                go io.Copy(ioutil.Discard, bufr)
@@ -438,6 +438,8 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
                return ctx.Err()
        case <-ready:
+               // Unblock pipe in case PutReader did not consume it.
+               io.Copy(ioutil.Discard, bufr)
                return v.translateError(err)
        }
 }
index 3c04118abe2ec2bb3f5fee4c1a74e078d61119ed..b124c66540aab804db3ad555b048c9ef5097e8d0 100644 (file)
@@ -33,7 +33,7 @@ def arvados_timestamp(timestr):
         subsecs = float(subsec_match.group(1))
         timestr = timestr[:subsec_match.start()] + 'Z'
     return calendar.timegm(time.strptime(timestr + 'UTC',
-                                         ARVADOS_TIMEFMT + '%Z'))
+                                         ARVADOS_TIMEFMT + '%Z')) + subsecs
 
 def timestamp_fresh(timestamp, fresh_time):
     return (time.time() - timestamp) < fresh_time
index 9106ea67ccc8ffac7813d64baa5ebc537548fa21..b4fec5096d5a8e2767169fce3910f45136ddaee3 100644 (file)
@@ -130,7 +130,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     @RetryMixin._retry()
     def create_cloud_node(self):
         self._logger.info("Sending create_node request for node size %s.",
-                          self.cloud_size.name)
+                          self.cloud_size.id)
         try:
             self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                       self.arvados_node)
@@ -432,6 +432,11 @@ class ComputeNodeMonitorActor(config.actor_class):
         reason for the decision.
         """
 
+        # If this node's size is invalid (because it has a stale arvados_node_size
+        # tag), return True so that it's properly shut down.
+        if self.cloud_node.size.id == 'invalid':
+            return (True, "node's size tag '%s' not recognizable" % (self.cloud_node.extra['arvados_node_size'],))
+
         # Collect states and then consult state transition table whether we
         # should shut down.  Possible states are:
         # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
index 7ed7435553647fdc55958337e6d2461345c4098d..9e38d13eb7f4788d8af485a7e5b4b6589c9f324c 100644 (file)
@@ -174,7 +174,7 @@ class BaseComputeNodeDriver(RetryMixin):
         try:
             kwargs = self.create_kwargs.copy()
             kwargs.update(self.arvados_create_kwargs(size, arvados_node))
-            kwargs['size'] = size
+            kwargs['size'] = size.real
             return self.real.create_node(**kwargs)
         except CLOUD_ERRORS as create_error:
             # Workaround for bug #6702: sometimes the create node request
index e0f260ab86542252102e28459381505833998d10..ae554327ca20d929a92b595da54e32ba05e6485f 100644 (file)
@@ -46,6 +46,8 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
 
     def arvados_create_kwargs(self, size, arvados_node):
         tags = {
+            # Set up tag indicating the Arvados assigned Cloud Size id.
+            'arvados_node_size': size.id,
             'booted_at': time.strftime(ARVADOS_TIMEFMT, time.gmtime()),
             'arv-ping-url': self._make_ping_url(arvados_node)
         }
@@ -83,11 +85,12 @@ echo %s > /var/tmp/arv-node-data/meta-data/instance-type
         # Do our own filtering based on tag.
         nodes = [node for node in
                 super(ComputeNodeDriver, self).list_nodes(ex_fetch_nic=False, ex_fetch_power_state=False)
-                if node.extra["tags"].get("arvados-class") == self.tags["arvados-class"]]
+                if node.extra.get("tags", {}).get("arvados-class") == self.tags["arvados-class"]]
         for n in nodes:
             # Need to populate Node.size
             if not n.size:
                 n.size = self.sizes[n.extra["properties"]["hardwareProfile"]["vmSize"]]
+            n.extra['arvados_node_size'] = n.extra.get('tags', {}).get('arvados_node_size')
         return nodes
 
     def broken(self, cloud_node):
index 9300645c38f47b74d780e605d32e37134df0c15a..c453b91ccb323a6a9e81fc138199bd78352ec4fa 100644 (file)
@@ -91,18 +91,27 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
                     "VolumeSize": volsize,
                     "VolumeType": "gp2"
                 }}]
+        if size.preemptable:
+            # Request a Spot instance for this node
+            kw['ex_spot_market'] = True
         return kw
 
     def sync_node(self, cloud_node, arvados_node):
         self.real.ex_create_tags(cloud_node,
                                  {'Name': arvados_node_fqdn(arvados_node)})
 
+    def create_node(self, size, arvados_node):
+        # Set up tag indicating the Arvados assigned Cloud Size id.
+        self.create_kwargs['ex_metadata'].update({'arvados_node_size': size.id})
+        return super(ComputeNodeDriver, self).create_node(size, arvados_node)
+
     def list_nodes(self):
         # Need to populate Node.size
         nodes = super(ComputeNodeDriver, self).list_nodes()
         for n in nodes:
             if not n.size:
                 n.size = self.sizes[n.extra["instance_type"]]
+            n.extra['arvados_node_size'] = n.extra.get('tags', {}).get('arvados_node_size')
         return nodes
 
     @classmethod
index 3f1d575361a461f322e6475fab28b059d973e193..be39ecba6bf4b3cfb4ef6e0e5dd7c1168dc86ddd 100644 (file)
@@ -102,25 +102,27 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
                   'ex_disks_gce_struct': disks,
                   }
         result['ex_metadata'].update({
-                'arv-ping-url': self._make_ping_url(arvados_node),
-                'booted_at': time.strftime(ARVADOS_TIMEFMT, time.gmtime()),
-                'hostname': arvados_node_fqdn(arvados_node),
-                })
+            'arvados_node_size': size.id,
+            'arv-ping-url': self._make_ping_url(arvados_node),
+            'booted_at': time.strftime(ARVADOS_TIMEFMT, time.gmtime()),
+            'hostname': arvados_node_fqdn(arvados_node),
+        })
         return result
 
-
     def list_nodes(self):
         # The GCE libcloud driver only supports filtering node lists by zone.
         # Do our own filtering based on tag list.
         nodelist = [node for node in
                     super(ComputeNodeDriver, self).list_nodes()
                     if self.node_tags.issubset(node.extra.get('tags', []))]
-        # As of 0.18, the libcloud GCE driver sets node.size to the size's name.
-        # It's supposed to be the actual size object.  Check that it's not,
-        # and monkeypatch the results when that's the case.
-        if nodelist and not hasattr(nodelist[0].size, 'id'):
-            for node in nodelist:
+        for node in nodelist:
+            # As of 0.18, the libcloud GCE driver sets node.size to the size's name.
+            # It's supposed to be the actual size object.  Check that it's not,
+            # and monkeypatch the results when that's the case.
+            if not hasattr(node.size, 'id'):
                 node.size = self._sizes_by_id[node.size]
+            # Get arvados-assigned cloud size id
+            node.extra['arvados_node_size'] = node.extra.get('metadata', {}).get('arvados_node_size')
         return nodelist
 
     @classmethod
index e47f9fcb1d036b78f94af0af25e8c37dc17b5ad0..f9724a8fccbcbc91f63547cb5039c5ae0fff997f 100644 (file)
@@ -17,6 +17,7 @@ from apiclient import errors as apierror
 
 from .baseactor import BaseNodeManagerActor
 
+from functools import partial
 from libcloud.common.types import LibcloudError
 from libcloud.common.exceptions import BaseHTTPError
 
@@ -69,12 +70,23 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                 if not self.has_option(sec_name, opt_name):
                     self.set(sec_name, opt_name, value)
 
-    def get_section(self, section, transformer=None):
+    def get_section(self, section, transformers={}, default_transformer=None):
+        transformer_map = {
+            str: self.get,
+            int: self.getint,
+            bool: self.getboolean,
+            float: self.getfloat,
+        }
         result = self._dict()
         for key, value in self.items(section):
+            transformer = None
+            if transformers.get(key) in transformer_map:
+                transformer = partial(transformer_map[transformers[key]], section)
+            elif default_transformer in transformer_map:
+                transformer = partial(transformer_map[default_transformer], section)
             if transformer is not None:
                 try:
-                    value = transformer(value)
+                    value = transformer(key)
                 except (TypeError, ValueError):
                     pass
             result[key] = value
@@ -128,31 +140,41 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                                         self.get_section('Cloud Create'),
                                         driver_class=driver_class)
 
-    def node_sizes(self, all_sizes):
+    def node_sizes(self):
         """Finds all acceptable NodeSizes for our installation.
 
         Returns a list of (NodeSize, kwargs) pairs for each NodeSize object
         returned by libcloud that matches a size listed in our config file.
         """
-
+        all_sizes = self.new_cloud_client().list_sizes()
         size_kwargs = {}
+        section_types = {
+            'instance_type': str,
+            'price': float,
+            'preemptable': bool,
+        }
         for sec_name in self.sections():
             sec_words = sec_name.split(None, 2)
             if sec_words[0] != 'Size':
                 continue
-            size_spec = self.get_section(sec_name, int)
-            if 'price' in size_spec:
-                size_spec['price'] = float(size_spec['price'])
+            size_spec = self.get_section(sec_name, section_types, int)
+            if 'preemptable' not in size_spec:
+                size_spec['preemptable'] = False
+            if 'instance_type' not in size_spec:
+                # Assume instance type is Size name if missing
+                size_spec['instance_type'] = sec_words[1]
+            size_spec['id'] = sec_words[1]
             size_kwargs[sec_words[1]] = size_spec
         # EC2 node sizes are identified by id. GCE sizes are identified by name.
         matching_sizes = []
         for size in all_sizes:
-            if size.id in size_kwargs:
-                matching_sizes.append((size, size_kwargs[size.id]))
-            elif size.name in size_kwargs:
-                matching_sizes.append((size, size_kwargs[size.name]))
+            matching_sizes += [
+                (size, size_kwargs[s]) for s in size_kwargs
+                if size_kwargs[s]['instance_type'] == size.id
+                or size_kwargs[s]['instance_type'] == size.name
+            ]
         return matching_sizes
 
     def shutdown_windows(self):
-        return [int(n)
+        return [float(n)
                 for n in self.get('Cloud', 'shutdown_windows').split(',')]
index 1b9f1e70ccc7cbf7b85b37f74e865d3e8a81964d..911798e08f937ded2d10e30b8b8fe7d64edd8f6b 100644 (file)
@@ -318,7 +318,7 @@ class NodeManagerDaemonActor(actor_class):
         busy_count = counts["busy"]
         wishlist_count = self._size_wishlist(size)
 
-        self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
+        self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.id,
                           wishlist_count,
                           up_count,
                           counts["booting"],
@@ -338,7 +338,7 @@ class NodeManagerDaemonActor(actor_class):
             can_boot = int((self.max_total_price - total_price) / size.price)
             if can_boot == 0:
                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
-                                  size.name, size.price, self.max_total_price, total_price)
+                                  size.id, size.price, self.max_total_price, total_price)
             return can_boot
         else:
             return wanted
@@ -392,7 +392,7 @@ class NodeManagerDaemonActor(actor_class):
             return None
         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
         self._logger.info("Want %i more %s nodes.  Booting a node.",
-                          nodes_wanted, cloud_size.name)
+                          nodes_wanted, cloud_size.id)
         new_setup = self._node_setup.start(
             timer_actor=self._timer,
             arvados_client=self._new_arvados(),
index 90b32290b76932fa93dbb1ff0854aeb2219eaf4c..99064b3988f10f72738d2e3a1699c3c1f2ab8911 100644 (file)
@@ -24,6 +24,26 @@ class ServerCalculator(object):
     that would best satisfy the jobs, choosing the cheapest size that
     satisfies each job, and ignoring jobs that can't be satisfied.
     """
+    class InvalidCloudSize(object):
+        """
+        Dummy CloudSizeWrapper-like class, to be used when a cloud node doesn't
+        have a recognizable arvados_node_size tag.
+        """
+        def __init__(self):
+            self.id = 'invalid'
+            self.name = 'invalid'
+            self.ram = 0
+            self.disk = 0
+            self.scratch = 0
+            self.cores = 0
+            self.bandwidth = 0
+            self.price = 9999999
+            self.preemptable = False
+            self.extra = {}
+
+        def meets_constraints(self, **kwargs):
+            return False
+
 
     class CloudSizeWrapper(object):
         def __init__(self, real_size, node_mem_scaling, **kwargs):
@@ -38,7 +58,9 @@ class ServerCalculator(object):
                 self.disk = 0
             self.scratch = self.disk * 1000
             self.ram = int(self.ram * node_mem_scaling)
+            self.preemptable = False
             for name, override in kwargs.iteritems():
+                if name == 'instance_type': continue
                 if not hasattr(self, name):
                     raise ValueError("unrecognized size field '%s'" % (name,))
                 setattr(self, name, override)
@@ -80,10 +102,12 @@ class ServerCalculator(object):
         wants = {'cores': want_value('min_cores_per_node'),
                  'ram': want_value('min_ram_mb_per_node'),
                  'scratch': want_value('min_scratch_mb_per_node')}
+        # EC2 node sizes are identified by id. GCE sizes are identified by name.
         for size in self.cloud_sizes:
             if (size.meets_constraints(**wants) and
-                (specified_size is None or size.id == specified_size)):
-                    return size
+                (specified_size is None or
+                    size.id == specified_size or size.name == specified_size)):
+                        return size
         return None
 
     def servers_for_queue(self, queue):
@@ -101,7 +125,7 @@ class ServerCalculator(object):
                     "Job's min_nodes constraint is greater than the configured "
                     "max_nodes (%d)" % self.max_nodes)
             elif (want_count*cloud_size.price <= self.max_price):
-                servers.extend([cloud_size.real] * want_count)
+                servers.extend([cloud_size] * want_count)
             else:
                 unsatisfiable_jobs[job['uuid']] = (
                     "Job's price (%d) is above system's max_price "
@@ -115,7 +139,7 @@ class ServerCalculator(object):
         for s in self.cloud_sizes:
             if s.id == sizeid:
                 return s
-        return None
+        return self.InvalidCloudSize()
 
 
 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
@@ -224,5 +248,5 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
                                    job_uuid,
                                    error)
         self._logger.debug("Calculated wishlist: %s",
-                           ', '.join(s.name for s in server_list) or "(empty)")
+                           ', '.join(s.id for s in server_list) or "(empty)")
         return super(JobQueueMonitorActor, self)._got_response(server_list)
index 888abf5a768d51cb34fe85b30ed9d1252b7dea4c..f65e0806ec56df96f81c5bed87f657b15355fdec 100644 (file)
@@ -71,7 +71,7 @@ def setup_logging(path, level, **sublevels):
     return root_logger
 
 def build_server_calculator(config):
-    cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
+    cloud_size_list = config.node_sizes()
     if not cloud_size_list:
         abort("No valid node sizes configured")
     return ServerCalculator(cloud_size_list,
@@ -80,7 +80,7 @@ def build_server_calculator(config):
                             config.getfloat('Daemon', 'node_mem_scaling'))
 
 def launch_pollers(config, server_calculator):
-    poll_time = config.getint('Daemon', 'poll_time')
+    poll_time = config.getfloat('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
     timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
index 4b9d5b60fb0ce5131d865f4b3d97b0652afb88c8..66af7c32d128ab3a51815a74443b885779052f6b 100644 (file)
@@ -80,8 +80,8 @@ class CloudNodeListMonitorActor(clientactor.RemotePollLoopActor):
     def _send_request(self):
         nodes = self._client.list_nodes()
         for n in nodes:
-            # Replace with libcloud NodeSize object with compatible
+            # Replace the libcloud NodeSize object with compatible
             # CloudSizeWrapper object which merges the size info reported from
             # the cloud with size information from the configuration file.
-            n.size = self._calculator.find_size(n.size.id)
+            n.size = self._calculator.find_size(n.extra['arvados_node_size'])
         return nodes
index 5d033081213c5faa72dc33c656dd9c3167f140da..2a592f9ee7499924d5a02c83ed2b4931f0a1e6bf 100644 (file)
@@ -43,13 +43,16 @@ class FakeDriver(NodeDriver):
         global all_nodes, create_calls
         create_calls += 1
         nodeid = "node%i" % create_calls
+        if ex_tags is None:
+            ex_tags = {}
+        ex_tags.update({'arvados_node_size': size.id})
         n = Node(nodeid, nodeid, NodeState.RUNNING, [], [], self, size=size, extra={"tags": ex_tags})
         all_nodes.append(n)
         if ex_customdata:
             ping_url = re.search(r"echo '(.*)' > /var/tmp/arv-node-data/arv-ping-url", ex_customdata).groups(1)[0]
         if ex_userdata:
             ping_url = ex_userdata
-        if ex_metadata:
+        elif ex_metadata:
             ping_url = ex_metadata["arv-ping-url"]
         ping_url += "&instance_id=" + nodeid
         ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
@@ -130,10 +133,10 @@ class RetryDriver(FakeDriver):
         create_calls += 1
         if create_calls < 2:
             raise RateLimitReachedError(429, "Rate limit exceeded",
-                                        headers={'retry-after': '12'})
+                                        headers={'retry-after': '2'})
         elif create_calls < 3:
             raise BaseHTTPError(429, "Rate limit exceeded",
-                                {'retry-after': '2'})
+                                {'retry-after': '1'})
         else:
             return super(RetryDriver, self).create_node(name=name,
                     size=size,
@@ -161,7 +164,12 @@ class FakeAwsDriver(FakeDriver):
                                                       auth=auth,
                                                       ex_metadata=ex_metadata,
                                                       ex_userdata=ex_userdata)
-        n.extra = {"launch_time": time.strftime(ARVADOS_TIMEFMT, time.gmtime())[:-1]}
+        n.extra = {
+            "launch_time": time.strftime(ARVADOS_TIMEFMT, time.gmtime())[:-1],
+            "tags" : {
+                "arvados_node_size": size.id
+            }
+        }
         return n
 
     def list_sizes(self, **kwargs):
@@ -187,7 +195,8 @@ class FakeGceDriver(FakeDriver):
                                                    ex_metadata=ex_metadata)
         n.extra = {
             "metadata": {
-                "items": [{"key": k, "value": v} for k,v in ex_metadata.iteritems()]
+                "items": [{"key": k, "value": v} for k,v in ex_metadata.iteritems()],
+                "arvados_node_size": size.id
             },
             "zone": "fake"
         }
index 3b8502c0535ef14777af2e211162d7774714c27d..d94ceb2fa40a3e7689b76341573aa44155bb003a 100644 (file)
@@ -35,19 +35,22 @@ setup(name='arvados-node-manager',
           ('share/doc/arvados-node-manager', ['agpl-3.0.txt', 'README.rst']),
       ],
       install_requires=[
-          'apache-libcloud>=2.3',
+          'apache-libcloud>=2.3.1.dev1',
           'arvados-python-client>=0.1.20170731145219',
           'future',
           'pykka',
           'python-daemon',
           'setuptools'
       ],
+      dependency_links=[
+          "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.3.1.dev1.zip"
+      ],
       test_suite='tests',
       tests_require=[
           'requests',
           'pbr<1.7.0',
           'mock>=1.0',
-          'apache-libcloud>=2.3',
+          'apache-libcloud>=2.3.1.dev1',
       ],
       zip_safe=False
       )
index 01f053c3701c16885bd6930d4ab5c4590ba923cb..a11a6d807ef9348d9a17deac9e0c2092ed929f46 100644 (file)
@@ -38,16 +38,16 @@ max_nodes = 8
 max_total_price = 0
 
 # Poll Azure nodes and Arvados for new information every N seconds.
-poll_time = 5
+poll_time = 0.5
 
 # Polls have exponential backoff when services fail to respond.
 # This is the longest time to wait between polls.
-max_poll_time = 300
+max_poll_time = 1
 
 # If Node Manager can't succesfully poll a service for this long,
 # it will never start or stop compute nodes, on the assumption that its
 # information is too outdated.
-poll_stale_after = 600
+poll_stale_after = 1
 
 # If Node Manager boots a cloud node, and it does not pair with an Arvados
 # node before this long, assume that there was a cloud bootstrap failure and
@@ -115,7 +115,7 @@ driver_class = {driver_class}
 # Azure bills by the minute, so it makes sense to agressively shut down idle
 # nodes.  Specify at least two windows.  You can add as many as you need beyond
 # that.
-shutdown_windows = 1, 999999
+shutdown_windows = 0.05, 999999
 
 [Cloud Credentials]
 # Use "azure account list" with the azure CLI to get these values.
index 744d7f849bec3793c0dcd4db77624e013b7e4f25..2bb7d0ea0b64354a5cb5500a52f0ff05a1bcf304 100644 (file)
@@ -38,16 +38,16 @@ max_nodes = 8
 max_total_price = 0
 
 # Poll Azure nodes and Arvados for new information every N seconds.
-poll_time = 5
+poll_time = 0.5
 
 # Polls have exponential backoff when services fail to respond.
 # This is the longest time to wait between polls.
-max_poll_time = 300
+max_poll_time = 1
 
 # If Node Manager can't succesfully poll a service for this long,
 # it will never start or stop compute nodes, on the assumption that its
 # information is too outdated.
-poll_stale_after = 600
+poll_stale_after = 1
 
 # If Node Manager boots a cloud node, and it does not pair with an Arvados
 # node before this long, assume that there was a cloud bootstrap failure and
@@ -115,7 +115,7 @@ driver_class = {driver_class}
 # Azure bills by the minute, so it makes sense to agressively shut down idle
 # nodes.  Specify at least two windows.  You can add as many as you need beyond
 # that.
-shutdown_windows = 1, 999999
+shutdown_windows = 0.05, 999999
 
 [Cloud Credentials]
 
index 1c39ccf668519baa1e4a91330573844c4a5b28e6..11131efbc3fa845e17c822ecf2a22366c61b0d12 100644 (file)
@@ -38,16 +38,16 @@ max_nodes = 8
 max_total_price = 0
 
 # Poll Azure nodes and Arvados for new information every N seconds.
-poll_time = 5
+poll_time = 0.5
 
 # Polls have exponential backoff when services fail to respond.
 # This is the longest time to wait between polls.
-max_poll_time = 300
+max_poll_time = 1
 
 # If Node Manager can't succesfully poll a service for this long,
 # it will never start or stop compute nodes, on the assumption that its
 # information is too outdated.
-poll_stale_after = 600
+poll_stale_after = 1
 
 # If Node Manager boots a cloud node, and it does not pair with an Arvados
 # node before this long, assume that there was a cloud bootstrap failure and
@@ -115,7 +115,7 @@ driver_class = {driver_class}
 # Azure bills by the minute, so it makes sense to agressively shut down idle
 # nodes.  Specify at least two windows.  You can add as many as you need beyond
 # that.
-shutdown_windows = 1, 999999
+shutdown_windows = 0.05, 999999
 
 [Cloud Credentials]
 key = 00000000-0000-0000-0000-000000000000
index 1699b5739015864b92fc465f39acb8df43e8b6e3..a8429e1369b62c2bf456a4225e45a3e38a6343b2 100755 (executable)
@@ -106,18 +106,6 @@ def node_paired(g):
 
     return 0
 
-def remaining_jobs(g):
-    update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
-                  "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
-
-    for k,v in all_jobs.items():
-        all_jobs[k] = "Running"
-
-    set_squeue(g)
-
-    return 0
-
-
 def node_busy(g):
     update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
                   "\n".join("echo '%s|idle|(null)'" % (v) for k,v in compute_nodes.items()))
@@ -125,7 +113,8 @@ def node_busy(g):
 
 def node_shutdown(g):
     global compute_nodes
-    del compute_nodes[g.group(1)]
+    if g.group(1) in compute_nodes:
+        del compute_nodes[g.group(1)]
     return 0
 
 def jobs_req(g):
@@ -187,8 +176,8 @@ def run_test(name, actions, checks, driver_class, jobs, provider):
                                       driver_class=driver_class,
                                       ssh_key=os.path.join(fake_slurm, "id_rsa.pub")))
 
-    # Tests must complete in less than 3 minutes.
-    timeout = time.time() + 180
+    # Tests must complete in less than 30 seconds.
+    timeout = time.time() + 30
     terminated = False
 
     # Now start node manager
@@ -216,7 +205,7 @@ def run_test(name, actions, checks, driver_class, jobs, provider):
                     if code != 0:
                         detail.error("Check failed")
                         if not terminated:
-                            p.terminate()
+                            p.kill()
                             terminated = True
 
             if terminated:
@@ -226,7 +215,7 @@ def run_test(name, actions, checks, driver_class, jobs, provider):
                 detail.error("Exceeded timeout with actions remaining: %s", actions)
                 code += 1
                 if not terminated:
-                    p.terminate()
+                    p.kill()
                     terminated = True
 
             k, v = actions[0]
@@ -237,11 +226,11 @@ def run_test(name, actions, checks, driver_class, jobs, provider):
                 code += v(g)
                 if code != 0:
                     detail.error("Action failed")
-                    p.terminate()
+                    p.kill()
                     terminated = True
 
             if not actions:
-                p.terminate()
+                p.kill()
                 terminated = True
     except KeyboardInterrupt:
         p.kill()
@@ -333,7 +322,6 @@ def main():
             ],
             # Checks (things that shouldn't happen)
             {
-                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
                 r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 4),
                 r".*Setting node quota.*": fail,
             },
@@ -353,13 +341,12 @@ def main():
                 (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
                 (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
                 (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
-                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
                 (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
                 (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown)
             ],
             # Checks (things that shouldn't happen)
             {
-                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
                 r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 2),
                 r".*Sending create_node request.*": partial(expect_count, 5)
             },
@@ -379,7 +366,7 @@ def main():
                 (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
                 (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
                 (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
-                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
                 (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
                 (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
                 (r".*sending request", jobs_req),
@@ -396,7 +383,6 @@ def main():
             ],
             # Checks (things that shouldn't happen)
             {
-                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
                 r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 6),
                 r".*Sending create_node request.*": partial(expect_count, 9)
             },
@@ -433,8 +419,8 @@ def main():
             # Actions (pattern -> action)
             [
                 (r".*Daemon started", set_squeue),
-                (r".*Rate limit exceeded - scheduling retry in 12 seconds", noop),
                 (r".*Rate limit exceeded - scheduling retry in 2 seconds", noop),
+                (r".*Rate limit exceeded - scheduling retry in 1 seconds", noop),
                 (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", noop),
             ],
             # Checks (things that shouldn't happen)
@@ -456,7 +442,6 @@ def main():
             ],
             # Checks (things that shouldn't happen)
             {
-                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
                 r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
                 r".*Setting node quota.*": fail,
             },
@@ -477,7 +462,6 @@ def main():
             ],
             # Checks (things that shouldn't happen)
             {
-                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
                 r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
                 r".*Setting node quota.*": fail,
             },
index 3f11ff6c2b22d02a47b8f8e9a6bfe246479d10ed..898112bdd8a8c8df86bf80e0a4af1aac3c592ec8 100644 (file)
@@ -37,3 +37,9 @@ class ShutdownTimerTestCase(unittest.TestCase):
         time_mock.return_value += 200
         self.assertEqual(961, timer.next_opening())
         self.assertFalse(timer.window_open())
+
+
+class ArvadosTimestamp(unittest.TestCase):
+    def test_arvados_timestamp(self):
+        self.assertEqual(1527710178, cnode.arvados_timestamp('2018-05-30T19:56:18Z'))
+        self.assertEqual(1527710178.999371, cnode.arvados_timestamp('2018-05-30T19:56:18.999371Z'))
index 5775aa659a31391f13a5071929d9f5562ba3969d..778c9aeaf5ffdbbcecaf90ac8072ace7210ce4a5 100644 (file)
@@ -426,6 +426,15 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
                           (False, "node state is ('unpaired', 'open', 'boot wait', 'idle exceeded')"))
 
+    def test_shutdown_when_invalid_cloud_node_size(self):
+        self.make_mocks(1)
+        self.cloud_mock.size.id = 'invalid'
+        self.cloud_mock.extra['arvados_node_size'] = 'stale.type'
+        self.make_actor()
+        self.shutdowns._set_state(True, 600)
+        self.assertEquals((True, "node's size tag 'stale.type' not recognizable"),
+                          self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+
     def test_shutdown_without_arvados_node(self):
         self.make_actor(start_time=0)
         self.shutdowns._set_state(True, 600)
index 128a29e28d24ba4d5f3f8aae1bc535c9c60af043..4bf4c39efbc45ea069ea91ca3c0e94108d9b248b 100644 (file)
@@ -80,7 +80,7 @@ class ComputeNodeDriverTestCase(unittest.TestCase):
         for an_error, is_cloud_error in errors:
             self.driver_mock().create_node.side_effect = an_error
             with self.assertRaises(an_error):
-                driver.create_node('1', 'id_1')
+                driver.create_node(testutil.MockSize(1), 'id_1')
             if is_cloud_error:
                 error_count += 1
             self.assertEqual(error_count, status.tracker.get('create_node_errors'))
index ce96a8040d83a9a091d2d3331bec1d4275d4a974..ea7a033f0b5f3934f55e15a7f3e15aaf4f279246 100644 (file)
@@ -44,14 +44,25 @@ class AzureComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase
         self.assertIn('ping_secret=ssshh',
                       create_method.call_args[1].get('ex_tags', {}).get('arv-ping-url', ""))
 
+    def test_create_includes_arvados_node_size(self):
+        arv_node = testutil.arvados_node_mock()
+        arv_node["hostname"] = None
+        size = testutil.MockSize(1)
+        driver = self.new_driver()
+        driver.create_node(size, arv_node)
+        create_method = self.driver_mock().create_node
+        self.assertTrue(create_method.called)
+        self.assertIn(
+            ('arvados_node_size', size.id),
+            create_method.call_args[1].get('ex_tags', {'tags': 'missing'}).items()
+        )
+
     def test_name_from_new_arvados_node(self):
         arv_node = testutil.arvados_node_mock(hostname=None)
         driver = self.new_driver()
         self.assertEqual('compute-000000000000063-zzzzz',
                          driver.arvados_create_kwargs(testutil.MockSize(1), arv_node)['name'])
 
-
-
     def check_node_tagged(self, cloud_node, expected_tags):
         tag_mock = self.driver_mock().ex_create_tags
         self.assertTrue(tag_mock.called)
@@ -91,6 +102,14 @@ echo z1.test > /var/tmp/arv-node-data/meta-data/instance-type
 """,
                          driver.arvados_create_kwargs(testutil.MockSize(1), arv_node)['ex_customdata'])
 
+    def test_list_nodes_ignores_nodes_without_tags(self):
+        driver = self.new_driver(create_kwargs={"tag_arvados-class": "dynamic-compute"})
+        # Mock cloud node without tags
+        nodelist = [testutil.cloud_node_mock(1)]
+        self.driver_mock().list_nodes.return_value = nodelist
+        n = driver.list_nodes()
+        self.assertEqual([], n)
+
     def test_create_raises_but_actually_succeeded(self):
         arv_node = testutil.arvados_node_mock(1, hostname=None)
         driver = self.new_driver(create_kwargs={"tag_arvados-class": "dynamic-compute"})
index 297eac0ef3baade9abcca3691009628f6c3647c2..9442a8c240256d09d94933b1bfaeef94494179c5 100644 (file)
@@ -56,9 +56,32 @@ class EC2ComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase):
         driver.create_node(testutil.MockSize(1), arv_node)
         create_method = self.driver_mock().create_node
         self.assertTrue(create_method.called)
+        self.assertIn(
+            ('test', 'testvalue'),
+            create_method.call_args[1].get('ex_metadata', {'arg': 'missing'}).items()
+        )
+
+    def test_create_includes_arvados_node_size(self):
+        arv_node = testutil.arvados_node_mock()
+        size = testutil.MockSize(1)
+        driver = self.new_driver()
+        driver.create_node(size, arv_node)
+        create_method = self.driver_mock().create_node
+        self.assertTrue(create_method.called)
+        self.assertIn(
+            ('arvados_node_size', size.id),
+            create_method.call_args[1].get('ex_metadata', {'arg': 'missing'}).items()
+        )
+
+    def test_create_preemptable_instance(self):
+        arv_node = testutil.arvados_node_mock()
+        driver = self.new_driver()
+        driver.create_node(testutil.MockSize(1, preemptable=True), arv_node)
+        create_method = self.driver_mock().create_node
+        self.assertTrue(create_method.called)
         self.assertEqual(
-            {'test':'testvalue'},
-            create_method.call_args[1].get('ex_metadata', {'arg': 'missing'})
+            True,
+            create_method.call_args[1].get('ex_spot_market', 'arg missing')
         )
 
     def test_hostname_from_arvados_node(self):
index f0942e93785571f8ae4e3cdb7f0c78eb173ee7b6..1446cd2fdae559171af4c93535e591f22840290d 100644 (file)
@@ -51,6 +51,17 @@ class GCEComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase):
         metadata = self.driver_mock().create_node.call_args[1]['ex_metadata']
         self.assertIn('ping_secret=ssshh', metadata.get('arv-ping-url'))
 
+    def test_create_includes_arvados_node_size(self):
+        arv_node = testutil.arvados_node_mock()
+        size = testutil.MockSize(1)
+        driver = self.new_driver()
+        driver.create_node(size, arv_node)
+        create_method = self.driver_mock().create_node
+        self.assertIn(
+            ('arvados_node_size', size.id),
+            create_method.call_args[1].get('ex_metadata', {'metadata':'missing'}).items()
+        )
+
     def test_create_raises_but_actually_succeeded(self):
         arv_node = testutil.arvados_node_mock(1, hostname=None)
         driver = self.new_driver()
index 921281bc517bd7b6bc41935193c0b8562395a6da..9a48c7cda971b1fd1f7ed14bbfa8ef15a8e5e99d 100644 (file)
@@ -29,6 +29,12 @@ creds = dummy_creds
 cores = 1
 price = 0.8
 
+[Size 1.preemptable]
+instance_type = 1
+preemptable = true
+cores = 1
+price = 0.8
+
 [Logging]
 file = /dev/null
 level = DEBUG
@@ -53,13 +59,25 @@ testlogger = INFO
 
     def test_list_sizes(self):
         config = self.load_config()
-        client = config.new_cloud_client()
-        sizes = config.node_sizes(client.list_sizes())
-        self.assertEqual(1, len(sizes))
+        sizes = config.node_sizes()
+        self.assertEqual(2, len(sizes))
         size, kwargs = sizes[0]
         self.assertEqual('Small', size.name)
         self.assertEqual(1, kwargs['cores'])
         self.assertEqual(0.8, kwargs['price'])
+        # preemptable is False by default
+        self.assertEqual(False, kwargs['preemptable'])
+        # instance_type == arvados node size id by default
+        self.assertEqual(kwargs['id'], kwargs['instance_type'])
+        # Now retrieve the preemptable version
+        size, kwargs = sizes[1]
+        self.assertEqual('Small', size.name)
+        self.assertEqual('1.preemptable', kwargs['id'])
+        self.assertEqual(1, kwargs['cores'])
+        self.assertEqual(0.8, kwargs['price'])
+        self.assertEqual(True, kwargs['preemptable'])
+        self.assertEqual('1', kwargs['instance_type'])
+
 
     def test_default_node_mem_scaling(self):
         config = self.load_config()
index 8050e6981411d69f127617e0cb2b44681470341d..d09cbf72359610ac08afa428e39f024d3086835c 100644 (file)
@@ -17,11 +17,24 @@ from arvnodeman.jobqueue import ServerCalculator
 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
 from . import test_status
+from . import pykka_timeout
 import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
 
+    def assertwait(self, f, timeout=pykka_timeout*2):
+        deadline = time.time() + timeout
+        while True:
+            try:
+                return f()
+            except AssertionError:
+                if time.time() > deadline:
+                    raise
+                pass
+            time.sleep(.1)
+            self.daemon.ping().get(self.TIMEOUT)
+
     def busywait(self, f):
         for n in xrange(200):
             ok = f()
@@ -146,8 +159,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertIn('node_quota', status.tracker._latest)
 
     def check_monitors_arvados_nodes(self, *arv_nodes):
-        self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
-        self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
+        self.assertwait(lambda: self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes()))
 
     def test_node_pairing(self):
         cloud_node = testutil.cloud_node_mock(1)
@@ -257,7 +269,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-        self.busywait(lambda: 2 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -269,7 +281,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-        self.busywait(lambda: 2 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         get_cloud_node = mock.MagicMock(name="get_cloud_node")
         get_cloud_node.get.return_value = cloud_nodes[1]
         mock_node_monitor = mock.MagicMock()
@@ -278,7 +290,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
 
-        self.busywait(lambda: 2 == self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.alive_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
@@ -298,8 +310,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
                          [size], avail_sizes=[(size, {"cores":1})])
-        self.busywait(lambda: 1 == self.paired_monitor_count())
-        self.busywait(lambda: self.node_setup.start.called)
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
+        self.assertwait(lambda: self.assertEqual(1, self.node_setup.start.called))
 
     def test_boot_new_node_below_min_nodes(self):
         min_size = testutil.MockSize(1)
@@ -543,7 +555,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(1)
         size = testutil.MockSize(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -553,7 +565,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -572,7 +584,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
                      testutil.arvados_node_mock(4, job_uuid=None)]
         self.make_daemon(cloud_nodes, arv_nodes, [size])
-        self.busywait(lambda: 2 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             monitor = mon_ref.proxy()
             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
@@ -591,13 +603,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = False
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
 
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = True
         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.busywait(lambda: 0 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(0, self.paired_monitor_count()))
 
     def test_nodes_shutting_down_replaced_below_max_nodes(self):
         size = testutil.MockSize(6)
@@ -616,7 +628,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(7)
         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
                          max_nodes=1)
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.assertTrue(self.node_shutdown.start.called)
@@ -630,7 +642,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
         self.make_daemon(cloud_nodes, arv_nodes, [size],
                          avail_sizes=[(size, {"cores":1})])
-        self.busywait(lambda: 2 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -671,7 +683,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
         arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
         self.make_daemon(cloud_nodes, arv_nodes, [size])
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             monitor = mon_ref.proxy()
             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
@@ -770,7 +782,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(3)],
                          want_sizes=[small, small, big],
                          avail_sizes=avail_sizes)
-        self.busywait(lambda: 3 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(3, self.paired_monitor_count()))
         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
 
         self.assertEqual(0, self.node_shutdown.start.call_count)
index 2d1a17eaecd82a7cea0be0103d27fc2e8f07c4c5..8bf3ea87412200a595d70d02c34796f75a2a8543 100644 (file)
@@ -48,9 +48,12 @@ class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
     def test_nonfatal_error(self):
         status.tracker.update({'actor_exceptions': 0})
         kill_mock = mock.Mock('os.kill')
-        act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
+        bgact = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock)
+        act_thread = bgact.proxy().get_thread().get()
+        act = bgact.tell_proxy()
         act.doStuff()
         act.actor_ref.stop(block=True)
+        act_thread.join()
         self.assertFalse(kill_mock.called)
         self.assertEqual(1, status.tracker.get('actor_exceptions'))
 
index 5becd0c2241386e34b6dfef8e57a29b025335a67..b087325c6f702347d68bd68983793b9ab3536787 100644 (file)
@@ -84,6 +84,7 @@ class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
             self.public_ips = []
             self.size = testutil.MockSize(1)
             self.state = 0
+            self.extra = {'arvados_node_size': self.size.id}
 
 
     def build_monitor(self, side_effect, *args, **kwargs):
index 555144c4d05d2bc562d9bc2357fa93421f64b35f..2ec13c0b8bbdb00d375b70cccb7fd31f31d66fe6 100644 (file)
@@ -78,7 +78,7 @@ class MockShutdownTimer(object):
 
 
 class MockSize(object):
-    def __init__(self, factor):
+    def __init__(self, factor, preemptable=False):
         self.id = 'z{}.test'.format(factor)
         self.name = 'test size '+self.id
         self.ram = 128 * factor
@@ -87,6 +87,8 @@ class MockSize(object):
         self.bandwidth = 16 * factor
         self.price = float(factor)
         self.extra = {}
+        self.real = self
+        self.preemptable = preemptable
 
     def __eq__(self, other):
         return self.id == other.id
index fea9cf6e8beeda2dcec9f15f88d759303286f173..35a8b156dec7cb650015b9e22155a16a18177615 100755 (executable)
@@ -1,4 +1,8 @@
 #!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 # system one time tasks
 
 PATH=/command:/sbin:/bin:/usr/sbin:/usr/bin
index 6b092eae678f0b8f825331b9140842d3bb7a8639..5812f3d8b0cea307b793016156b8fa73b3909224 100755 (executable)
@@ -1,4 +1,7 @@
 #!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
 
 PATH=/usr/local/bin:/usr/local/sbin:/bin:/sbin:/usr/bin:/usr/sbin:/usr/X11R6/bin
 
index 525b96bbb667564ecf7f2eef66d37009796a7406..242c035f66dcbb6d865d52bc421ec91881a093fb 100755 (executable)
@@ -1,4 +1,8 @@
 #!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 exec 2>&1
 
 PATH=/command:/sbin:/bin:/usr/sbin:/usr/bin
index 02bb2ea91884d110c8ad1add1cf7cab87ce30304..d4d2190b152d284c55bc5b4bdd988787be2d7f69 100755 (executable)
@@ -1,4 +1,8 @@
 #!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 exec 2>&1
 
 PATH=/command:/sbin:/bin:/usr/sbin:/usr/bin
index 78c8d4278f32b2050ff006d168cf61213804645a..562ee839e0832c9b6d394c00f62c9a524f89d30a 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (c) 2009 Dan Vanderkam. All rights reserved.
+//
+// SPDX-License-Identifier: MIT
+
 /**
  * Synchronize zooming and/or selections between a set of dygraphs.
  *
@@ -31,7 +35,6 @@
  * You may also set `range: false` if you wish to only sync the x-axis.
  * The `range` option has no effect unless `zoom` is true (the default).
  *
- * SPDX-License-Identifier: MIT
  * Original source: https://github.com/danvk/dygraphs/blob/master/src/extras/synchronizer.js
  * at commit b55a71d768d2f8de62877c32b3aec9e9975ac389
  *