14360: Merge branch 'master'
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 15 Nov 2018 21:24:48 +0000 (16:24 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 15 Nov 2018 21:24:48 +0000 (16:24 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

52 files changed:
apps/workbench/Gemfile.lock
build/package-build-dockerfiles/debian9/Dockerfile
build/package-test-dockerfiles/debian9/Dockerfile
build/run-tests.sh
doc/_config.yml
doc/admin/federation.html.textile.liquid [new file with mode: 0644]
doc/admin/upgrading.html.textile.liquid
doc/architecture/federation.html.textile.liquid
lib/controller/fed_collections.go [new file with mode: 0644]
lib/controller/fed_containers.go [new file with mode: 0644]
lib/controller/fed_generic.go [new file with mode: 0644]
lib/controller/federation.go
lib/controller/federation_test.go
lib/controller/handler.go
lib/controller/handler_test.go
lib/controller/proxy.go
sdk/go/arvados/api_client_authorization.go
sdk/go/arvados/container.go
sdk/go/arvados/fs_backend.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvadostest/fixtures.go
sdk/go/httpserver/id_generator.go
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/keep.py
sdk/python/tests/test_collections.py
sdk/python/tests/test_keep_client.py
services/api/Gemfile.lock
services/api/app/controllers/application_controller.rb
services/api/app/mailers/user_notifier.rb
services/api/app/models/collection.rb
services/api/app/models/container.rb
services/api/test/functional/arvados/v1/users_controller_test.rb
services/api/test/integration/remote_user_test.rb
services/api/test/unit/container_request_test.rb
services/api/test/unit/container_test.rb
services/api/test/unit/job_test.rb
services/api/test/unit/user_notifier_test.rb
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/keep-balance/keep-balance.service
services/keep-web/cadaver_test.go
services/keep-web/handler.go
services/keep-web/webdav.go
services/keepstore/proxy_remote.go
services/keepstore/s3_volume.go
services/keepstore/trash_worker.go
tools/arvbox/bin/arvbox
vendor/vendor.json

index 42a321125ec14c861f7c1cc92697e12928517082..3ec8f9d908b0a10b658dc6a3dc24e73a2d40cb9f 100644 (file)
@@ -156,7 +156,7 @@ GEM
       railties (>= 4)
       request_store (~> 1.0)
     logstash-event (1.2.02)
-    loofah (2.2.2)
+    loofah (2.2.3)
       crass (~> 1.0.2)
       nokogiri (>= 1.5.9)
     mail (2.7.0)
@@ -182,7 +182,7 @@ GEM
     net-ssh (4.2.0)
     net-ssh-gateway (2.0.0)
       net-ssh (>= 4.0.0)
-    nokogiri (1.8.2)
+    nokogiri (1.8.5)
       mini_portile2 (~> 2.3.0)
     npm-rails (0.2.1)
       rails (>= 3.2)
index 0323e041369e1aca90dd88333f888818acee5478..42094d53bb8a4d982afce90b5a65bd28dd3fd763 100644 (file)
@@ -13,7 +13,7 @@ RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev pyth
 
 # Install RVM
 ADD generated/rvm.asc /tmp/
-RUN gpg --import /tmp/rvm.asc && \
+RUN gpg --no-tty --import /tmp/rvm.asc && \
     curl -L https://get.rvm.io | bash -s stable && \
     /usr/local/rvm/bin/rvm install 2.3 && \
     /usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
index 0bb7019fce2af0841bbd53cdeb2e52919399068f..c682ccc7cb05f84fbe68d56c60a4846c3448fe69 100644 (file)
@@ -13,7 +13,7 @@ RUN apt-get update && \
 
 # Install RVM
 ADD generated/rvm.asc /tmp/
-RUN gpg --import /tmp/rvm.asc && \
+RUN gpg --no-tty --import /tmp/rvm.asc && \
     curl -L https://get.rvm.io | bash -s stable && \
     /usr/local/rvm/bin/rvm install 2.3 && \
     /usr/local/rvm/bin/rvm alias create default ruby-2.3
index 1b42ea21c4cc0ae7ae44c84ad2cb8ecc52325c77..cb44372566f8eb36ec6163f9108c97e464d6fca8 100755 (executable)
@@ -693,7 +693,7 @@ do_test_once() {
         # before trying "go test". Otherwise, coverage-reporting
         # mode makes Go show the wrong line numbers when reporting
         # compilation errors.
-        go get -t "git.curoverse.com/arvados.git/$1" && \
+        go get -ldflags "-X main.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1" && \
             cd "$GOPATH/src/git.curoverse.com/arvados.git/$1" && \
             [[ -z "$(gofmt -e -d . | tee /dev/stderr)" ]] && \
             if [[ -n "${testargs[$1]}" ]]
@@ -761,7 +761,7 @@ do_install_once() {
     timer_reset
     if [[ "$2" == "go" ]]
     then
-        go get -t "git.curoverse.com/arvados.git/$1"
+        go get -ldflags "-X main.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1"
     elif [[ "$2" == "pip" ]]
     then
         # $3 can name a path directory for us to use, including trailing
index 017aa580d431476c39abe0892fbcfdb937c539c0..21260f761282adb57d3a90cf15635d1f16bc0d4e 100644 (file)
@@ -169,6 +169,7 @@ navbar:
       - admin/spot-instances.html.textile.liquid
     - Other:
       - admin/collection-versioning.html.textile.liquid
+      - admin/federation.html.textile.liquid
   installguide:
     - Overview:
       - install/index.html.textile.liquid
diff --git a/doc/admin/federation.html.textile.liquid b/doc/admin/federation.html.textile.liquid
new file mode 100644 (file)
index 0000000..3728507
--- /dev/null
@@ -0,0 +1,74 @@
+---
+layout: default
+navsection: admin
+title: Configuring federation
+...
+
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+This page describes how to enable and configure federation capabilities between clusters.
+
+An overview on how this feature works is discussed in the "architecture section":{{site.baseurl}}/architecture/federation.html
+
+h3. API Server configuration
+
+To accept users from remote clusters, some settings need to be added to the @application.yml@ file. There are two ways in which a remote cluster can be identified: either explictly by listing its prefix-to-hostname mapping, or implicitly by assuming the given remote cluster is public and belongs to the @.arvadosapi.com@ subdomain.
+
+For example, if you want to set up a private cluster federation, the following configuration will only allow access to users from @clsr2@ & @clsr3@:
+
+<pre>
+production:
+  remote_hosts:
+    clsr2: api.cluster2.com
+    clsr3: api.cluster3.com
+  remote_hosts_via_dns: false
+  auto_activate_users_from: []
+</pre>
+
+The additional @auto_activate_users_from@ setting can be used to allow users from the clusters in the federation to not only read but also create & update objects on the local cluster. This feature is covered in more detail in the "user activation section":{{site.baseurl}}/admin/activation.html. In the current example, only manually activated remote users would have full access to the local cluster.
+
+h3. Arvados controller & keepstores configuration
+
+Both @arvados-controller@ and @keepstore@ services also need to be configured, as they proxy requests to remote clusters when needed.
+
+Continuing the previous example, the necessary settings should be added to the @/etc/arvados/config.yml@ file as follows:
+
+<pre>
+Clusters:
+  clsr1:
+    RemoteClusters:
+      clsr2:
+        Host: api.cluster2.com
+        Proxy: true
+      clsr3:
+        Host: api.cluster3.com
+        Proxy: true
+</pre>
+
+Similar settings should be added to @clsr2@ & @clsr3@ hosts, so that all clusters in the federation can talk to each other.
+
+h3. Testing
+
+Following the above example, let's suppose @clsr1@ is our "home cluster", that is to say, we use our @clsr1@ user account as our federated identity and both @clsr2@ and @clsr3@ remote clusters are set up to allow users from @clsr1@ and to auto-activate them. The first thing to do would be to log into a remote workbench using the local user token. This can be done following these steps:
+
+1. Log into the local workbench and get the user token
+2. Visit the remote workbench specifying the local user token by URL: @https://workbench.cluster2.com?api_token=token_from_clsr1@
+3. You should now be logged into @clsr2@ with your account from @clsr1@
+
+To further test the federation setup, you can create a collection on @clsr2@, uploading some files and copying its UUID. Next, logged into a shell node on your home cluster you should be able to get that collection by running:
+
+<pre>
+user@clsr1:~$ arv collection get --uuid clsr2-xvhdp-xxxxxxxxxxxxxxx
+</pre>
+
+The returned collection metadata should show the local user's uuid on the @owner_uuid@ field. This tests that the @arvados-controller@ service is proxying requests correctly.
+
+One last test may be performed, to confirm that the @keepstore@ services also recognize remote cluster prefixes and proxy the requests. You can ask for the previously created collection using any of the usual tools, for example:
+
+<pre>
+user@clsr1:~$ arv-get clsr2-xvhdp-xxxxxxxxxxxxxxx/uploaded_file .
+</pre>
index 55f39f7d848356714b3190a6c3addc07168167dc..15667741fda9256c5e9c73e99e2e01f2044a61a2 100644 (file)
@@ -30,13 +30,43 @@ Note to developers: Add new items at the top. Include the date, issue number, co
 TODO: extract this information based on git commit messages and generate changelogs / release notes automatically.
 {% endcomment %}
 
-h3. 2018-07-31: "#13497":https://dev.arvados.org/issues/13497 "db5107dca":https://dev.arvados.org/projects/arvados/repository/revisions/db5107dca adds a new system service, arvados-controller
-* "Install the controller":../install/install-controller.html after upgrading your system.
-* Verify your setup by confirming that API calls appear in the controller's logs (_e.g._, @journalctl -fu arvados-controller@) while loading a workbench page.
+h3. v1.2.0 (2018-09-05)
 
-h3. 2018-04-05: v1.1.4 regression in arvados-cwl-runner for workflows that rely on implicit discovery of secondaryFiles
+h4. Regenerate Postgres table statistics
 
-h4. Secondary files missing from toplevel workflow inputs
+It is recommended to regenerate the table statistics for Postgres after upgrading to v1.2.0. If autovacuum is enabled on your installation, this script would do the trick:
+
+<pre>
+#!/bin/bash
+
+set -e
+set -u
+
+tables=`echo "\dt" | psql arvados_production | grep public|awk -e '{print $3}'`
+
+for t in $tables; do
+    echo "echo 'analyze $t' | psql arvados_production"
+    time echo "analyze $t" | psql arvados_production
+done
+</pre>
+
+If you also need to do the vacuum, you could adapt the script to run 'vacuum analyze' instead of 'analyze'.
+
+h4. New component: arvados-controller
+
+Commit "db5107dca":https://dev.arvados.org/projects/arvados/repository/revisions/db5107dca adds a new system service, arvados-controller. More detail is available in story "#13496":https://dev.arvados.org/issues/13497.
+
+To add the Arvados Controller to your system please refer to the "installation instructions":../install/install-controller.html after upgrading your system to 1.2.0.
+
+Verify your setup by confirming that API calls appear in the controller's logs (_e.g._, @journalctl -fu arvados-controller@) while loading a workbench page.
+
+h3. v1.1.4 (2018-04-10)
+
+h4. arvados-cwl-runner regressions (2018-04-05)
+
+<strong>Secondary files missing from toplevel workflow inputs</strong>
+
+This only affects workflows that rely on implicit discovery of secondaryFiles.
 
 If a workflow input does not declare @secondaryFiles@ corresponding to the @secondaryFiles@ of workflow steps which use the input, the workflow would inconsistently succeed or fail depending on whether the input values were specified as local files or referenced an existing collection (and whether the existing collection contained the secondary files or not).  To ensure consistent behavior, the workflow is now required to declare in the top level workflow inputs any secondaryFiles that are expected by workflow steps.
 
@@ -108,9 +138,11 @@ steps:
 </code></pre>
 </notextile>
 
-h4. Secondary files on default file inputs
+This bug has been fixed in Arvados release v1.2.0.
+
+<strong>Secondary files on default file inputs</strong>
 
-Due to a bug in Arvados v1.1.4, @File@ inputs that have default values and also expect @secondaryFiles@ and will fail to upload default @secondaryFiles@.  As an example, the following case will fail:
+@File@ inputs that have default values and also expect @secondaryFiles@ and will fail to upload default @secondaryFiles@.  As an example, the following case will fail:
 
 <pre>
 class: CommandLineTool
@@ -153,9 +185,18 @@ baseCommand: echo
 </code></pre>
 </notextile>
 
-This bug will be fixed in an upcoming release of Arvados.
+This bug has been fixed in Arvados release v1.2.0.
+
+h3. v1.1.3 (2018-02-08)
+
+There are no special upgrade notes for this release.
+
+h3. v1.1.2 (2017-12-22)
+
+h4. The minimum version for Postgres is now 9.4 (2017-12-08)
+
+As part of story "#11908":https://dev.arvados.org/issues/11908, commit "8f987a9271":https://dev.arvados.org/projects/arvados/repository/revisions/8f987a9271 introduces a dependency on Postgres 9.4. Previously, Arvados required Postgres 9.3.
 
-h3. 2017-12-08: "#11908":https://dev.arvados.org/issues/11908 "8f987a9271":https://dev.arvados.org/projects/arvados/repository/revisions/8f987a9271 now requires minimum of Postgres 9.4 (previously 9.3)
 * Debian 8 (pg 9.4) and Debian 9 (pg 9.6) do not require an upgrade
 * Ubuntu 16.04 (pg 9.5) does not require an upgrade
 * Ubuntu 14.04 (pg 9.3) requires upgrade to Postgres 9.4: https://www.postgresql.org/download/linux/ubuntu/
@@ -164,7 +205,16 @@ h3. 2017-12-08: "#11908":https://dev.arvados.org/issues/11908 "8f987a9271":https
 *# Install the @rh-postgresql94@ backport package from either Software Collections: http://doc.arvados.org/install/install-postgresql.html or the Postgres developers: https://www.postgresql.org/download/linux/redhat/
 *# Restore from the backup using @psql@
 
-h3. 2017-09-25: "#12032":https://dev.arvados.org/issues/12032 "68bdf4cbb":https://dev.arvados.org/projects/arvados/repository/revisions/68bdf4cbb now requires minimum of Postgres 9.3 (previously 9.1)
+h3. v1.1.1 (2017-11-30)
+
+There are no special upgrade notes for this release.
+
+h3. v1.1.0 (2017-10-24)
+
+h4. The minimum version for Postgres is now 9.3 (2017-09-25)
+
+As part of story "#12032":https://dev.arvados.org/issues/12032, commit "68bdf4cbb1":https://dev.arvados.org/projects/arvados/repository/revisions/68bdf4cbb1 introduces a dependency on Postgres 9.3. Previously, Arvados required Postgres 9.1.
+
 * Debian 8 (pg 9.4) and Debian 9 (pg 9.6) do not require an upgrade
 * Ubuntu 16.04 (pg 9.5) does not require an upgrade
 * Ubuntu 14.04 (pg 9.3) is compatible, however upgrading to Postgres 9.4 is recommended: https://www.postgresql.org/download/linux/ubuntu/
@@ -173,21 +223,34 @@ h3. 2017-09-25: "#12032":https://dev.arvados.org/issues/12032 "68bdf4cbb":https:
 *# Install the @rh-postgresql94@ backport package from either Software Collections: http://doc.arvados.org/install/install-postgresql.html or the Postgres developers: https://www.postgresql.org/download/linux/redhat/
 *# Restore from the backup using @psql@
 
-h3. 2017-06-30: "#11807":https://dev.arvados.org/issues/11807 "55aafbb":https://dev.arvados.org/projects/arvados/repository/revisions/55aafbb converts old "jobs" database records from YAML to JSON, making the upgrade process slower than usual.
+h3. Older versions
+
+h4. Upgrade slower than usual (2017-06-30)
+
+As part of story "#11807":https://dev.arvados.org/issues/11807, commit "55aafbb":https://dev.arvados.org/projects/arvados/repository/revisions/55aafbb converts old "jobs" database records from YAML to JSON, making the upgrade process slower than usual.
+
 * The migration can take some time if your database contains a substantial number of YAML-serialized rows (i.e., you installed Arvados before March 3, 2017 "660a614":https://dev.arvados.org/projects/arvados/repository/revisions/660a614 and used the jobs/pipelines APIs). Otherwise, the upgrade will be no slower than usual.
 * The conversion runs as a database migration, i.e., during the deb/rpm package upgrade process, while your API server is unavailable.
 * Expect it to take about 1 minute per 20K jobs that have ever been created/run.
 
-h3. 2017-06-05: "#9005":https://dev.arvados.org/issues/9005 "cb230b0":https://dev.arvados.org/projects/arvados/repository/revisions/cb230b0 reduces service discovery overhead in keep-web requests.
+h4. Service discovery overhead change in keep-web (2017-06-05)
+
+As part of story "#9005":https://dev.arvados.org/issues/9005, commit "cb230b0":https://dev.arvados.org/projects/arvados/repository/revisions/cb230b0 reduces service discovery overhead in keep-web requests.
+
 * When upgrading keep-web _or keepproxy_ to/past this version, make sure to update API server as well. Otherwise, a bad token in a request can cause keep-web to fail future requests until either keep-web restarts or API server gets upgraded.
 
-h3. 2017-04-12: "#11349":https://dev.arvados.org/issues/11349 "2c094e2":https://dev.arvados.org/projects/arvados/repository/revisions/2c094e2 adds a "management" http server to nodemanager.
+h4. Node manager now has an http endpoint for management (2017-04-12)
+
+As part of story "#11349":https://dev.arvados.org/issues/11349, commit "2c094e2":https://dev.arvados.org/projects/arvados/repository/revisions/2c094e2 adds a "management" http server to nodemanager.
+
 * To enable it, add to your configuration file: <pre>[Manage]
   address = 127.0.0.1
   port = 8989</pre> (see example configuration files in source:services/nodemanager/doc or https://doc.arvados.org/install/install-nodemanager.html for more info)
 * The server responds to @http://{address}:{port}/status.json@ with a summary of how many nodes are in each state (booting, busy, shutdown, etc.)
 
-h3. 2017-03-23: "#10766":https://dev.arvados.org/issues/10766 "e8cc0d7":https://dev.arvados.org/projects/arvados/repository/revisions/e8cc0d7 replaces puma with arvados-ws as the recommended websocket server.
+h4. New websockets component (2017-03-23)
+
+As part of story "#10766":https://dev.arvados.org/issues/10766, commit "e8cc0d7":https://dev.arvados.org/projects/arvados/repository/revisions/e8cc0d7 replaces puma with arvados-ws as the recommended websocket server.
 * See http://doc.arvados.org/install/install-ws.html for install/upgrade instructions.
 * Remove the old puma server after the upgrade is complete. Example, with runit: <pre>
 $ sudo sv down /etc/sv/puma
@@ -197,17 +260,25 @@ $ systemctl disable puma
 $ systemctl stop puma
 </pre>
 
-h3. 2017-03-06: "#11168":https://dev.arvados.org/issues/11168 "660a614":https://dev.arvados.org/projects/arvados/repository/revisions/660a614 uses JSON instead of YAML to encode hashes and arrays in the database.
+h4. Change of database encoding for hashes and arrays (2017-03-06)
+
+As part of story "#11168":https://dev.arvados.org/issues/11168, commit "660a614":https://dev.arvados.org/projects/arvados/repository/revisions/660a614 uses JSON instead of YAML to encode hashes and arrays in the database.
+
 * Aside from a slight performance improvement, this should have no externally visible effect.
 * Downgrading past this version is not supported, and is likely to cause errors. If this happens, the solution is to upgrade past this version.
 * After upgrading, make sure to restart puma and crunch-dispatch-* processes.
 
-h3. 2017-02-03: "#10969":https://dev.arvados.org/issues/10969 "74a9dec":https://dev.arvados.org/projects/arvados/repository/revisions/74a9dec introduces a Docker image format compatibility check: the @arv keep docker@ command prevents users from inadvertently saving docker images that compute nodes won't be able to run.
+h4. Docker image format compatibility check (2017-02-03)
+
+As part of story "#10969":https://dev.arvados.org/issues/10969, commit "74a9dec":https://dev.arvados.org/projects/arvados/repository/revisions/74a9dec introduces a Docker image format compatibility check: the @arv keep docker@ command prevents users from inadvertently saving docker images that compute nodes won't be able to run.
 * If your compute nodes run a version of *docker older than 1.10* you must override the default by adding to your API server configuration (@/etc/arvados/api/application.yml@): <pre><code class="yaml">docker_image_formats: ["v1"]</code></pre>
 * Refer to the comments above @docker_image_formats@ in @/var/www/arvados-api/current/config/application.default.yml@ or source:services/api/config/application.default.yml or issue "#10969":https://dev.arvados.org/issues/10969 for more detail.
 * *NOTE:* This does *not* include any support for migrating existing Docker images from v1 to v2 format. This will come later: for now, sites running Docker 1.9 or earlier should still *avoid upgrading Docker further than 1.9.*
 
-h3. 2016-09-27: several Debian and RPM packages -- keep-balance ("d9eec0b":https://dev.arvados.org/projects/arvados/repository/revisions/d9eec0b), keep-web ("3399e63":https://dev.arvados.org/projects/arvados/repository/revisions/3399e63), keepproxy ("6de67b6":https://dev.arvados.org/projects/arvados/repository/revisions/6de67b6), and arvados-git-httpd ("9e27ddf":https://dev.arvados.org/projects/arvados/repository/revisions/9e27ddf) -- now enable their respective components using systemd. These components prefer YAML configuration files over command line flags ("3bbe1cd":https://dev.arvados.org/projects/arvados/repository/revisions/3bbe1cd).
+h4. Debian and RPM packages now have systemd unit files (2016-09-27)
+
+Several Debian and RPM packages -- keep-balance ("d9eec0b":https://dev.arvados.org/projects/arvados/repository/revisions/d9eec0b), keep-web ("3399e63":https://dev.arvados.org/projects/arvados/repository/revisions/3399e63), keepproxy ("6de67b6":https://dev.arvados.org/projects/arvados/repository/revisions/6de67b6), and arvados-git-httpd ("9e27ddf":https://dev.arvados.org/projects/arvados/repository/revisions/9e27ddf) -- now enable their respective components using systemd. These components prefer YAML configuration files over command line flags ("3bbe1cd":https://dev.arvados.org/projects/arvados/repository/revisions/3bbe1cd).
+
 * On Debian-based systems using systemd, services are enabled automatically when packages are installed.
 * On RedHat-based systems using systemd, unit files are installed but services must be enabled explicitly: e.g., <code>"sudo systemctl enable keep-web; sudo systemctl start keep-web"</code>.
 * The new systemd-supervised services will not start up successfully until configuration files are installed in /etc/arvados/: e.g., <code>"Sep 26 18:23:55 62751f5bb946 keep-web[74]: 2016/09/26 18:23:55 open /etc/arvados/keep-web/keep-web.yml: no such file or directory"</code>
@@ -222,33 +293,57 @@ h3. 2016-09-27: several Debian and RPM packages -- keep-balance ("d9eec0b":https
 ** keepproxy - /etc/arvados/keepproxy/keepproxy.yml
 ** arvados-git-httpd - /etc/arvados/arv-git-httpd/arv-git-httpd.yml
 
-h3. 2016-05-31: "ae72b172c8":https://dev.arvados.org/projects/arvados/repository/revisions/ae72b172c8 and "3aae316c25":https://dev.arvados.org/projects/arvados/repository/revisions/3aae316c25 install Python modules and scripts to different locations on the filesystem.
+h4. Installation paths for Python modules and script changed (2016-05-31)
+
+Commits "ae72b172c8":https://dev.arvados.org/projects/arvados/repository/revisions/ae72b172c8 and "3aae316c25":https://dev.arvados.org/projects/arvados/repository/revisions/3aae316c25 change the filesystem location where Python modules and scripts are installed.
+
 * Previous packages installed these files to the distribution's preferred path under @/usr/local@ (or the equivalent location in a Software Collection).  Now they get installed to a path under @/usr@.  This improves compatibility with other Python packages provided by the distribution.  See "#9242":https://dev.arvados.org/issues/9242 for more background.
 * If you simply import Python modules from scripts, or call Python tools relying on $PATH, you don't need to make any changes.  If you have hardcoded full paths to some of these files (e.g., in symbolic links or configuration files), you will need to update those paths after this upgrade.
 
-h3. 2016-04-25: "eebcb5e":https://dev.arvados.org/projects/arvados/repository/revisions/eebcb5e requires the crunchrunner package to be installed on compute nodes and shell nodes in order to run CWL workflows.
+h4. Crunchrunner package is required on compute and shell nodes (2016-04-25)
+
+Commit "eebcb5e":https://dev.arvados.org/projects/arvados/repository/revisions/eebcb5e requires the crunchrunner package to be installed on compute nodes and shell nodes in order to run CWL workflows.
+
 * On each Debian-based compute node and shell node, run: @sudo apt-get install crunchrunner@
 * On each Red Hat-based compute node and shell node, run: @sudo yum install crunchrunner@
 
-h3. 2016-04-21: "3c88abd":https://dev.arvados.org/projects/arvados/repository/revisions/3c88abd changes the Keep permission signature algorithm.
+h4. Keep permission signature algorithm change (2016-04-21)
+
+Commit "3c88abd":https://dev.arvados.org/projects/arvados/repository/revisions/3c88abd changes the Keep permission signature algorithm.
+
 * All software components that generate signatures must be upgraded together. These are: keepstore, API server, keep-block-check, and keep-rsync. For example, if keepstore < 0.1.20160421183420 but API server >= 0.1.20160421183420, clients will not be able to read or write data in Keep.
 * Jobs and client operations that are in progress during the upgrade (including arv-put's "resume cache") will fail.
 
-h3. 2015-01-05: "e1276d6e":https://dev.arvados.org/projects/arvados/repository/revisions/e1276d6e disables Workbench's "Getting Started" popup by default.
+h4. Workbench's "Getting Started" popup disabled by default (2015-01-05)
+
+Commit "e1276d6e":https://dev.arvados.org/projects/arvados/repository/revisions/e1276d6e disables Workbench's "Getting Started" popup by default.
+
 * If you want new users to continue seeing this popup, set @enable_getting_started_popup: true@ in Workbench's @application.yml@ configuration.
 
-h3. 2015-12-03: "5590c9ac":https://dev.arvados.org/projects/arvados/repository/revisions/5590c9ac makes a Keep-backed writable scratch directory available in crunch jobs (see "#7751":https://dev.arvados.org/issues/7751)
+h4. Crunch jobs now have access to Keep-backed writable scratch storage (2015-12-03)
+
+Commit "5590c9ac":https://dev.arvados.org/projects/arvados/repository/revisions/5590c9ac makes a Keep-backed writable scratch directory available in crunch jobs (see "#7751":https://dev.arvados.org/issues/7751)
+
 * All compute nodes must be upgraded to arvados-fuse >= 0.1.2015112518060 because crunch-job uses some new arv-mount flags (--mount-tmp, --mount-by-pdh) introduced in merge "346a558":https://dev.arvados.org/projects/arvados/repository/revisions/346a558
 * Jobs will fail if the API server (in particular crunch-job from the arvados-cli gem) is upgraded without upgrading arvados-fuse on compute nodes.
 
-h3. 2015-11-11: "1e2ace5":https://dev.arvados.org/projects/arvados/repository/revisions/1e2ace5 changes recommended config for keep-web (see "#5824":https://dev.arvados.org/issues/5824)
+h4. Recommended configuration change for keep-web (2015-11-11)
+
+Commit "1e2ace5":https://dev.arvados.org/projects/arvados/repository/revisions/1e2ace5 changes recommended config for keep-web (see "#5824":https://dev.arvados.org/issues/5824)
+
 * proxy/dns/ssl config should be updated to route "https://download.uuid_prefix.arvadosapi.com/" requests to keep-web (alongside the existing "collections" routing)
 * keep-web command line adds @-attachment-only-host download.uuid_prefix.arvadosapi.com@
 * Workbench config adds @keep_web_download_url@
 * More info on the (still beta/non-TOC-linked) "keep-web doc page":http://doc.arvados.org/install/install-keep-web.html
 
-h3. 2015-11-04: "1d1c6de":https://dev.arvados.org/projects/arvados/repository/revisions/1d1c6de removes stopped containers (see "#7444":https://dev.arvados.org/issues/7444)
+h4. Stopped containers are now automatically removed on compute nodes (2015-11-04)
+
+Commit "1d1c6de":https://dev.arvados.org/projects/arvados/repository/revisions/1d1c6de removes stopped containers (see "#7444":https://dev.arvados.org/issues/7444)
+
 * arvados-docker-cleaner removes _all_ docker containers as soon as they exit, effectively making @docker run@ default to @--rm@. If you run arvados-docker-cleaner on a host that does anything other than run crunch-jobs, and you still want to be able to use @docker start@, read the "new doc page":http://doc.arvados.org/install/install-compute-node.html to learn how to turn this off before upgrading.
 
-h3. 2015-11-04: "21006cf":https://dev.arvados.org/projects/arvados/repository/revisions/21006cf adds a keep-web service (see "#5824":https://dev.arvados.org/issues/5824)
-* Nothing relies on it yet, but early adopters can install it now by following http://doc.arvados.org/install/install-keep-web.html (it is not yet linked in the TOC).
+h4. New keep-web service (2015-11-04)
+
+Commit "21006cf":https://dev.arvados.org/projects/arvados/repository/revisions/21006cf adds a new keep-web service (see "#5824":https://dev.arvados.org/issues/5824).
+
+* Nothing relies on keep-web yet, but early adopters can install it now by following http://doc.arvados.org/install/install-keep-web.html (it is not yet linked in the TOC).
index ef91c44053f83468025171674aed34b176fcd1e3..08dad1e3173391b6c34e16b0eac5096b2598a028 100644 (file)
@@ -14,9 +14,11 @@ Arvados federation enables clients to transparently read, create and manipulate
 
 _This feature is under development.  Support for federation is limited to certain types of requests.  The behaviors described here should not be interpreted as a stable API._
 
+Detailed configuration information is available on the "federation admin section":{{site.baseurl}}/admin/federation.html.
+
 h2(#cluster_id). Cluster identifiers
 
-Clusters are identified by a five-digit alphanumeric id (numbers and lowercase letters).  There are 36^5^ = 60466176 possible cluster identifiers.
+Clusters are identified by a five-digit alphanumeric id (numbers and lowercase letters).  There are 36 ^5^ = 60466176 possible cluster identifiers.
 
 * For automated tests purposes, use "z****"
 * For experimental/local-only/private clusters that won't ever be visible on the public Internet, use "x****"
@@ -58,7 +60,7 @@ A federated user has a single identity across the cluster federation.  This iden
 
 h3. Authenticating remote users with salted tokens
 
-When making a request to the home cluster, authorization is established by looking up the API token in the @api_client_authorizations@ table to determine the user identity.  When making a request to a remote cluster, we need to provide an API token which can be used to establish the user's identity.  The remote cluster will connect back to the home cluster to determine if the token valid and the user it corresponds to.  However, we do not want to send along the same API token used for the original request.  If the remote cluster is malicious or compromised, sending along user's regular token would compromise the user account on the home cluster.  Instead, the controller sends a "salted token".  The salted token is restricted to only to fetching the user account and group membership.  The salted token consists of the uuid of the token in @api_client_authorizations@ and the SHA1 HMAC of the original token and the cluster id of remote cluster.  To verify the token, the remote cluster contacts the home cluster and provides the token uuid, the hash, and its cluster id.  The home cluster uses the uuid to look up the token re-computes the SHA1 HMAC of the original token and cluster id.  If that hash matches, the the token is valid.  To avoid having to re-validate the token on every request, it is cached for a short period.
+When making a request to the home cluster, authorization is established by looking up the API token in the @api_client_authorizations@ table to determine the user identity.  When making a request to a remote cluster, we need to provide an API token which can be used to establish the user's identity.  The remote cluster will connect back to the home cluster to determine if the token valid and the user it corresponds to.  However, we do not want to send along the same API token used for the original request.  If the remote cluster is malicious or compromised, sending along user's regular token would compromise the user account on the home cluster.  Instead, the controller sends a "salted token".  The salted token is restricted to only to fetching the user account and group membership.  The salted token consists of the uuid of the token in @api_client_authorizations@ and the SHA1 HMAC of the original token and the cluster id of remote cluster.  To verify the token, the remote cluster contacts the home cluster and provides the token uuid, the hash, and its cluster id.  The home cluster uses the uuid to look up the token re-computes the SHA1 HMAC of the original token and cluster id.  If that hash matches, then the token is valid.  To avoid having to re-validate the token on every request, it is cached for a short period.
 
 The security properties of this scheme are:
 
diff --git a/lib/controller/fed_collections.go b/lib/controller/fed_collections.go
new file mode 100644 (file)
index 0000000..ab49e39
--- /dev/null
@@ -0,0 +1,314 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+       "bufio"
+       "bytes"
+       "context"
+       "crypto/md5"
+       "encoding/json"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "net/http"
+       "strings"
+       "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+func rewriteSignatures(clusterID string, expectHash string,
+       resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+
+       if requestError != nil {
+               return resp, requestError
+       }
+
+       if resp.StatusCode != http.StatusOK {
+               return resp, nil
+       }
+
+       originalBody := resp.Body
+       defer originalBody.Close()
+
+       var col arvados.Collection
+       err = json.NewDecoder(resp.Body).Decode(&col)
+       if err != nil {
+               return nil, err
+       }
+
+       // rewriting signatures will make manifest text 5-10% bigger so calculate
+       // capacity accordingly
+       updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
+
+       hasher := md5.New()
+       mw := io.MultiWriter(hasher, updatedManifest)
+       sz := 0
+
+       scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
+       scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
+       for scanner.Scan() {
+               line := scanner.Text()
+               tokens := strings.Split(line, " ")
+               if len(tokens) < 3 {
+                       return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+               }
+
+               n, err := mw.Write([]byte(tokens[0]))
+               if err != nil {
+                       return nil, fmt.Errorf("Error updating manifest: %v", err)
+               }
+               sz += n
+               for _, token := range tokens[1:] {
+                       n, err = mw.Write([]byte(" "))
+                       if err != nil {
+                               return nil, fmt.Errorf("Error updating manifest: %v", err)
+                       }
+                       sz += n
+
+                       m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
+                       if m != nil {
+                               // Rewrite the block signature to be a remote signature
+                               _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
+                               if err != nil {
+                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
+                               }
+
+                               // for hash checking, ignore signatures
+                               n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
+                               if err != nil {
+                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
+                               }
+                               sz += n
+                       } else {
+                               n, err = mw.Write([]byte(token))
+                               if err != nil {
+                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
+                               }
+                               sz += n
+                       }
+               }
+               n, err = mw.Write([]byte("\n"))
+               if err != nil {
+                       return nil, fmt.Errorf("Error updating manifest: %v", err)
+               }
+               sz += n
+       }
+
+       // Check that expected hash is consistent with
+       // portable_data_hash field of the returned record
+       if expectHash == "" {
+               expectHash = col.PortableDataHash
+       } else if expectHash != col.PortableDataHash {
+               return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
+       }
+
+       // Certify that the computed hash of the manifest_text matches our expectation
+       sum := hasher.Sum(nil)
+       computedHash := fmt.Sprintf("%x+%v", sum, sz)
+       if computedHash != expectHash {
+               return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
+       }
+
+       col.ManifestText = updatedManifest.String()
+
+       newbody, err := json.Marshal(col)
+       if err != nil {
+               return nil, err
+       }
+
+       buf := bytes.NewBuffer(newbody)
+       resp.Body = ioutil.NopCloser(buf)
+       resp.ContentLength = int64(buf.Len())
+       resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
+
+       return resp, nil
+}
+
+func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+       if requestError != nil {
+               return resp, requestError
+       }
+
+       if resp.StatusCode == http.StatusNotFound {
+               // Suppress returning this result, because we want to
+               // search the federation.
+               return nil, nil
+       }
+       return resp, nil
+}
+
+type searchRemoteClusterForPDH struct {
+       pdh           string
+       remoteID      string
+       mtx           *sync.Mutex
+       sentResponse  *bool
+       sharedContext *context.Context
+       cancelFunc    func()
+       errors        *[]string
+       statusCode    *int
+}
+
+func fetchRemoteCollectionByUUID(
+       h *genericFederatedRequestHandler,
+       effectiveMethod string,
+       clusterId *string,
+       uuid string,
+       remainder string,
+       w http.ResponseWriter,
+       req *http.Request) bool {
+
+       if effectiveMethod != "GET" {
+               // Only handle GET requests right now
+               return false
+       }
+
+       if uuid != "" {
+               // Collection UUID GET request
+               *clusterId = uuid[0:5]
+               if *clusterId != "" && *clusterId != h.handler.Cluster.ClusterID {
+                       // request for remote collection by uuid
+                       resp, err := h.handler.remoteClusterRequest(*clusterId, req)
+                       newResponse, err := rewriteSignatures(*clusterId, "", resp, err)
+                       h.handler.proxy.ForwardResponse(w, newResponse, err)
+                       return true
+               }
+       }
+
+       return false
+}
+
+func fetchRemoteCollectionByPDH(
+       h *genericFederatedRequestHandler,
+       effectiveMethod string,
+       clusterId *string,
+       uuid string,
+       remainder string,
+       w http.ResponseWriter,
+       req *http.Request) bool {
+
+       if effectiveMethod != "GET" {
+               // Only handle GET requests right now
+               return false
+       }
+
+       m := collectionsByPDHRe.FindStringSubmatch(req.URL.Path)
+       if len(m) != 2 {
+               return false
+       }
+
+       // Request for collection by PDH.  Search the federation.
+
+       // First, query the local cluster.
+       resp, err := h.handler.localClusterRequest(req)
+       newResp, err := filterLocalClusterResponse(resp, err)
+       if newResp != nil || err != nil {
+               h.handler.proxy.ForwardResponse(w, newResp, err)
+               return true
+       }
+
+       // Create a goroutine for each cluster in the
+       // RemoteClusters map.  The first valid result gets
+       // returned to the client.  When that happens, all
+       // other outstanding requests are cancelled
+       sharedContext, cancelFunc := context.WithCancel(req.Context())
+       req = req.WithContext(sharedContext)
+       wg := sync.WaitGroup{}
+       pdh := m[1]
+       success := make(chan *http.Response)
+       errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
+
+       // use channel as a semaphore to limit the number of concurrent
+       // requests at a time
+       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
+
+       defer cancelFunc()
+
+       for remoteID := range h.handler.Cluster.RemoteClusters {
+               if remoteID == h.handler.Cluster.ClusterID {
+                       // No need to query local cluster again
+                       continue
+               }
+
+               wg.Add(1)
+               go func(remote string) {
+                       defer wg.Done()
+                       // blocks until it can put a value into the
+                       // channel (which has a max queue capacity)
+                       sem <- true
+                       select {
+                       case <-sharedContext.Done():
+                               return
+                       default:
+                       }
+
+                       resp, err := h.handler.remoteClusterRequest(remote, req)
+                       wasSuccess := false
+                       defer func() {
+                               if resp != nil && !wasSuccess {
+                                       resp.Body.Close()
+                               }
+                       }()
+                       if err != nil {
+                               errorChan <- err
+                               return
+                       }
+                       if resp.StatusCode != http.StatusOK {
+                               errorChan <- HTTPError{resp.Status, resp.StatusCode}
+                               return
+                       }
+                       select {
+                       case <-sharedContext.Done():
+                               return
+                       default:
+                       }
+
+                       newResponse, err := rewriteSignatures(remote, pdh, resp, nil)
+                       if err != nil {
+                               errorChan <- err
+                               return
+                       }
+                       select {
+                       case <-sharedContext.Done():
+                       case success <- newResponse:
+                               wasSuccess = true
+                       }
+                       <-sem
+               }(remoteID)
+       }
+       go func() {
+               wg.Wait()
+               cancelFunc()
+       }()
+
+       errorCode := http.StatusNotFound
+
+       for {
+               select {
+               case newResp = <-success:
+                       h.handler.proxy.ForwardResponse(w, newResp, nil)
+                       return true
+               case <-sharedContext.Done():
+                       var errors []string
+                       for len(errorChan) > 0 {
+                               err := <-errorChan
+                               if httperr, ok := err.(HTTPError); ok {
+                                       if httperr.Code != http.StatusNotFound {
+                                               errorCode = http.StatusBadGateway
+                                       }
+                               }
+                               errors = append(errors, err.Error())
+                       }
+                       httpserver.Errors(w, errors, errorCode)
+                       return true
+               }
+       }
+
+       // shouldn't ever get here
+       return true
+}
diff --git a/lib/controller/fed_containers.go b/lib/controller/fed_containers.go
new file mode 100644 (file)
index 0000000..7fd5b25
--- /dev/null
@@ -0,0 +1,122 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "strings"
+
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+func remoteContainerRequestCreate(
+       h *genericFederatedRequestHandler,
+       effectiveMethod string,
+       clusterId *string,
+       uuid string,
+       remainder string,
+       w http.ResponseWriter,
+       req *http.Request) bool {
+
+       if effectiveMethod != "POST" || uuid != "" || remainder != "" {
+               return false
+       }
+
+       // First make sure supplied token is valid.
+       creds := auth.NewCredentials()
+       creds.LoadTokensFromHTTPRequest(req)
+
+       currentUser, err := h.handler.validateAPItoken(req, creds.Tokens[0])
+       if err != nil {
+               httpserver.Error(w, err.Error(), http.StatusForbidden)
+               return true
+       }
+
+       if *clusterId == "" {
+               *clusterId = h.handler.Cluster.ClusterID
+       }
+
+       if strings.HasPrefix(currentUser.Authorization.UUID, h.handler.Cluster.ClusterID) &&
+               *clusterId == h.handler.Cluster.ClusterID {
+               // local user submitting container request to local cluster
+               return false
+       }
+
+       if req.Header.Get("Content-Type") != "application/json" {
+               httpserver.Error(w, "Expected Content-Type: application/json, got "+req.Header.Get("Content-Type"), http.StatusBadRequest)
+               return true
+       }
+
+       originalBody := req.Body
+       defer originalBody.Close()
+       var request map[string]interface{}
+       err = json.NewDecoder(req.Body).Decode(&request)
+       if err != nil {
+               httpserver.Error(w, err.Error(), http.StatusBadRequest)
+               return true
+       }
+
+       crString, ok := request["container_request"].(string)
+       if ok {
+               var crJson map[string]interface{}
+               err := json.Unmarshal([]byte(crString), &crJson)
+               if err != nil {
+                       httpserver.Error(w, err.Error(), http.StatusBadRequest)
+                       return true
+               }
+
+               request["container_request"] = crJson
+       }
+
+       containerRequest, ok := request["container_request"].(map[string]interface{})
+       if !ok {
+               // Use toplevel object as the container_request object
+               containerRequest = request
+       }
+
+       // If runtime_token is not set, create a new token
+       if _, ok := containerRequest["runtime_token"]; !ok {
+               if len(currentUser.Authorization.Scopes) != 1 || currentUser.Authorization.Scopes[0] != "all" {
+                       httpserver.Error(w, "Token scope is not [all]", http.StatusForbidden)
+                       return true
+               }
+
+               if strings.HasPrefix(currentUser.Authorization.UUID, h.handler.Cluster.ClusterID) {
+                       // Local user, submitting to a remote cluster.
+                       // Create a new time-limited token.
+                       newtok, err := h.handler.createAPItoken(req, currentUser.UUID, nil)
+                       if err != nil {
+                               httpserver.Error(w, err.Error(), http.StatusForbidden)
+                               return true
+                       }
+                       containerRequest["runtime_token"] = newtok.TokenV2()
+               } else {
+                       // Remote user. Container request will use the
+                       // current token, minus the trailing portion
+                       // (optional container uuid).
+                       sp := strings.Split(creds.Tokens[0], "/")
+                       if len(sp) >= 3 {
+                               containerRequest["runtime_token"] = strings.Join(sp[0:3], "/")
+                       } else {
+                               containerRequest["runtime_token"] = creds.Tokens[0]
+                       }
+               }
+       }
+
+       newbody, err := json.Marshal(request)
+       buf := bytes.NewBuffer(newbody)
+       req.Body = ioutil.NopCloser(buf)
+       req.ContentLength = int64(buf.Len())
+       req.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
+
+       resp, err := h.handler.remoteClusterRequest(*clusterId, req)
+       h.handler.proxy.ForwardResponse(w, resp, err)
+       return true
+}
diff --git a/lib/controller/fed_generic.go b/lib/controller/fed_generic.go
new file mode 100644 (file)
index 0000000..9c8b161
--- /dev/null
@@ -0,0 +1,352 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "net/url"
+       "regexp"
+       "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+type federatedRequestDelegate func(
+       h *genericFederatedRequestHandler,
+       effectiveMethod string,
+       clusterId *string,
+       uuid string,
+       remainder string,
+       w http.ResponseWriter,
+       req *http.Request) bool
+
+type genericFederatedRequestHandler struct {
+       next      http.Handler
+       handler   *Handler
+       matcher   *regexp.Regexp
+       delegates []federatedRequestDelegate
+}
+
+func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
+       req *http.Request,
+       clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
+
+       found := make(map[string]bool)
+       prev_len_uuids := len(uuids) + 1
+       // Loop while
+       // (1) there are more uuids to query
+       // (2) we're making progress - on each iteration the set of
+       // uuids we are expecting for must shrink.
+       for len(uuids) > 0 && len(uuids) < prev_len_uuids {
+               var remoteReq http.Request
+               remoteReq.Header = req.Header
+               remoteReq.Method = "POST"
+               remoteReq.URL = &url.URL{Path: req.URL.Path}
+               remoteParams := make(url.Values)
+               remoteParams.Set("_method", "GET")
+               remoteParams.Set("count", "none")
+               if req.Form.Get("select") != "" {
+                       remoteParams.Set("select", req.Form.Get("select"))
+               }
+               content, err := json.Marshal(uuids)
+               if err != nil {
+                       return nil, "", err
+               }
+               remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
+               enc := remoteParams.Encode()
+               remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
+
+               rc := multiClusterQueryResponseCollector{clusterID: clusterID}
+
+               var resp *http.Response
+               if clusterID == h.handler.Cluster.ClusterID {
+                       resp, err = h.handler.localClusterRequest(&remoteReq)
+               } else {
+                       resp, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
+               }
+               rc.collectResponse(resp, err)
+
+               if rc.error != nil {
+                       return nil, "", rc.error
+               }
+
+               kind = rc.kind
+
+               if len(rc.responses) == 0 {
+                       // We got zero responses, no point in doing
+                       // another query.
+                       return rp, kind, nil
+               }
+
+               rp = append(rp, rc.responses...)
+
+               // Go through the responses and determine what was
+               // returned.  If there are remaining items, loop
+               // around and do another request with just the
+               // stragglers.
+               for _, i := range rc.responses {
+                       uuid, ok := i["uuid"].(string)
+                       if ok {
+                               found[uuid] = true
+                       }
+               }
+
+               l := []string{}
+               for _, u := range uuids {
+                       if !found[u] {
+                               l = append(l, u)
+                       }
+               }
+               prev_len_uuids = len(uuids)
+               uuids = l
+       }
+
+       return rp, kind, nil
+}
+
+func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
+       req *http.Request, clusterId *string) bool {
+
+       var filters [][]interface{}
+       err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
+       if err != nil {
+               httpserver.Error(w, err.Error(), http.StatusBadRequest)
+               return true
+       }
+
+       // Split the list of uuids by prefix
+       queryClusters := make(map[string][]string)
+       expectCount := 0
+       for _, filter := range filters {
+               if len(filter) != 3 {
+                       return false
+               }
+
+               if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
+                       return false
+               }
+
+               op, ok := filter[1].(string)
+               if !ok {
+                       return false
+               }
+
+               if op == "in" {
+                       if rhs, ok := filter[2].([]interface{}); ok {
+                               for _, i := range rhs {
+                                       if u, ok := i.(string); ok && len(u) == 27 {
+                                               *clusterId = u[0:5]
+                                               queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+                                               expectCount += 1
+                                       }
+                               }
+                       }
+               } else if op == "=" {
+                       if u, ok := filter[2].(string); ok && len(u) == 27 {
+                               *clusterId = u[0:5]
+                               queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+                               expectCount += 1
+                       }
+               } else {
+                       return false
+               }
+
+       }
+
+       if len(queryClusters) <= 1 {
+               // Query does not search for uuids across multiple
+               // clusters.
+               return false
+       }
+
+       // Validations
+       count := req.Form.Get("count")
+       if count != "" && count != `none` && count != `"none"` {
+               httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
+               return true
+       }
+       if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
+               httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
+               return true
+       }
+       if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
+               httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
+                       expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
+               return true
+       }
+       if req.Form.Get("select") != "" {
+               foundUUID := false
+               var selects []string
+               err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
+               if err != nil {
+                       httpserver.Error(w, err.Error(), http.StatusBadRequest)
+                       return true
+               }
+
+               for _, r := range selects {
+                       if r == "uuid" {
+                               foundUUID = true
+                               break
+                       }
+               }
+               if !foundUUID {
+                       httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
+                       return true
+               }
+       }
+
+       // Perform concurrent requests to each cluster
+
+       // use channel as a semaphore to limit the number of concurrent
+       // requests at a time
+       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
+       defer close(sem)
+       wg := sync.WaitGroup{}
+
+       req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+       mtx := sync.Mutex{}
+       errors := []error{}
+       var completeResponses []map[string]interface{}
+       var kind string
+
+       for k, v := range queryClusters {
+               if len(v) == 0 {
+                       // Nothing to query
+                       continue
+               }
+
+               // blocks until it can put a value into the
+               // channel (which has a max queue capacity)
+               sem <- true
+               wg.Add(1)
+               go func(k string, v []string) {
+                       rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
+                       mtx.Lock()
+                       if err == nil {
+                               completeResponses = append(completeResponses, rp...)
+                               kind = kn
+                       } else {
+                               errors = append(errors, err)
+                       }
+                       mtx.Unlock()
+                       wg.Done()
+                       <-sem
+               }(k, v)
+       }
+       wg.Wait()
+
+       if len(errors) > 0 {
+               var strerr []string
+               for _, e := range errors {
+                       strerr = append(strerr, e.Error())
+               }
+               httpserver.Errors(w, strerr, http.StatusBadGateway)
+               return true
+       }
+
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       itemList := make(map[string]interface{})
+       itemList["items"] = completeResponses
+       itemList["kind"] = kind
+       json.NewEncoder(w).Encode(itemList)
+
+       return true
+}
+
+func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+       m := h.matcher.FindStringSubmatch(req.URL.Path)
+       clusterId := ""
+
+       if len(m) > 0 && m[2] != "" {
+               clusterId = m[2]
+       }
+
+       // Get form parameters from URL and form body (if POST).
+       if err := loadParamsFromForm(req); err != nil {
+               httpserver.Error(w, err.Error(), http.StatusBadRequest)
+               return
+       }
+
+       // Check if the parameters have an explicit cluster_id
+       if req.Form.Get("cluster_id") != "" {
+               clusterId = req.Form.Get("cluster_id")
+       }
+
+       // Handle the POST-as-GET special case (workaround for large
+       // GET requests that potentially exceed maximum URL length,
+       // like multi-object queries where the filter has 100s of
+       // items)
+       effectiveMethod := req.Method
+       if req.Method == "POST" && req.Form.Get("_method") != "" {
+               effectiveMethod = req.Form.Get("_method")
+       }
+
+       if effectiveMethod == "GET" &&
+               clusterId == "" &&
+               req.Form.Get("filters") != "" &&
+               h.handleMultiClusterQuery(w, req, &clusterId) {
+               return
+       }
+
+       var uuid string
+       if len(m[1]) > 0 {
+               // trim leading slash
+               uuid = m[1][1:]
+       }
+       for _, d := range h.delegates {
+               if d(h, effectiveMethod, &clusterId, uuid, m[3], w, req) {
+                       return
+               }
+       }
+
+       if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
+               h.next.ServeHTTP(w, req)
+       } else {
+               resp, err := h.handler.remoteClusterRequest(clusterId, req)
+               h.handler.proxy.ForwardResponse(w, resp, err)
+       }
+}
+
+type multiClusterQueryResponseCollector struct {
+       responses []map[string]interface{}
+       error     error
+       kind      string
+       clusterID string
+}
+
+func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
+       requestError error) (newResponse *http.Response, err error) {
+       if requestError != nil {
+               c.error = requestError
+               return nil, nil
+       }
+
+       defer resp.Body.Close()
+       var loadInto struct {
+               Kind   string                   `json:"kind"`
+               Items  []map[string]interface{} `json:"items"`
+               Errors []string                 `json:"errors"`
+       }
+       err = json.NewDecoder(resp.Body).Decode(&loadInto)
+
+       if err != nil {
+               c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
+               return nil, nil
+       }
+       if resp.StatusCode != http.StatusOK {
+               c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
+               return nil, nil
+       }
+
+       c.responses = loadInto.Items
+       c.kind = loadInto.Kind
+
+       return nil, nil
+}
index 5c6f6bf7ab9d503c395701688555359a9e925e6b..557c7c3563d59c23644370765f466e63517f4d5a 100644 (file)
@@ -5,72 +5,56 @@
 package controller
 
 import (
-       "bufio"
        "bytes"
-       "context"
-       "crypto/md5"
        "database/sql"
        "encoding/json"
        "fmt"
        "io"
        "io/ioutil"
+       "mime"
        "net/http"
        "net/url"
        "regexp"
        "strings"
-       "sync"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/auth"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/jmcvetta/randutil"
 )
 
 var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-[0-9a-z]{15})?(.*)$`
 var wfRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "workflows", "7fd4e"))
 var containersRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "containers", "dz642"))
 var containerRequestsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "container_requests", "xvhdp"))
-var collectionRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "collections", "4zz18"))
-var collectionByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
+var collectionsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "collections", "4zz18"))
+var collectionsByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
+var linksRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "links", "o0j2j"))
 
-type genericFederatedRequestHandler struct {
-       next    http.Handler
-       handler *Handler
-       matcher *regexp.Regexp
-}
-
-type collectionFederatedRequestHandler struct {
-       next    http.Handler
-       handler *Handler
-}
-
-func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, req *http.Request, filter ResponseFilter) {
+func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*http.Response, error) {
        remote, ok := h.Cluster.RemoteClusters[remoteID]
        if !ok {
-               httpserver.Error(w, "no proxy available for cluster "+remoteID, http.StatusNotFound)
-               return
+               return nil, HTTPError{fmt.Sprintf("no proxy available for cluster %v", remoteID), http.StatusNotFound}
        }
        scheme := remote.Scheme
        if scheme == "" {
                scheme = "https"
        }
-       err := h.saltAuthToken(req, remoteID)
+       saltedReq, err := h.saltAuthToken(req, remoteID)
        if err != nil {
-               httpserver.Error(w, err.Error(), http.StatusBadRequest)
-               return
+               return nil, err
        }
        urlOut := &url.URL{
                Scheme:   scheme,
                Host:     remote.Host,
-               Path:     req.URL.Path,
-               RawPath:  req.URL.RawPath,
-               RawQuery: req.URL.RawQuery,
+               Path:     saltedReq.URL.Path,
+               RawPath:  saltedReq.URL.RawPath,
+               RawQuery: saltedReq.URL.RawQuery,
        }
        client := h.secureClient
        if remote.Insecure {
                client = h.insecureClient
        }
-       h.proxy.Do(w, req, urlOut, client, filter)
+       return h.proxy.Do(saltedReq, urlOut, client)
 }
 
 // Buffer request body, parse form parameters in request, and then
@@ -78,7 +62,11 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
 // downstream proxy steps.
 func loadParamsFromForm(req *http.Request) error {
        var postBody *bytes.Buffer
-       if req.Body != nil && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
+       if ct := req.Header.Get("Content-Type"); ct == "" {
+               // Assume application/octet-stream, i.e., no form to parse.
+       } else if ct, _, err := mime.ParseMediaType(ct); err != nil {
+               return err
+       } else if ct == "application/x-www-form-urlencoded" && req.Body != nil {
                var cl int64
                if req.ContentLength > 0 {
                        cl = req.ContentLength
@@ -100,596 +88,27 @@ func loadParamsFromForm(req *http.Request) error {
        return nil
 }
 
-type multiClusterQueryResponseCollector struct {
-       responses []map[string]interface{}
-       error     error
-       kind      string
-       clusterID string
-}
-
-func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
-       requestError error) (newResponse *http.Response, err error) {
-       if requestError != nil {
-               c.error = requestError
-               return nil, nil
-       }
-
-       defer resp.Body.Close()
-       var loadInto struct {
-               Kind   string                   `json:"kind"`
-               Items  []map[string]interface{} `json:"items"`
-               Errors []string                 `json:"errors"`
-       }
-       err = json.NewDecoder(resp.Body).Decode(&loadInto)
-
-       if err != nil {
-               c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
-               return nil, nil
-       }
-       if resp.StatusCode != http.StatusOK {
-               c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
-               return nil, nil
-       }
-
-       c.responses = loadInto.Items
-       c.kind = loadInto.Kind
-
-       return nil, nil
-}
-
-func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
-       req *http.Request,
-       clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
-
-       found := make(map[string]bool)
-       prev_len_uuids := len(uuids) + 1
-       // Loop while
-       // (1) there are more uuids to query
-       // (2) we're making progress - on each iteration the set of
-       // uuids we are expecting for must shrink.
-       for len(uuids) > 0 && len(uuids) < prev_len_uuids {
-               var remoteReq http.Request
-               remoteReq.Header = req.Header
-               remoteReq.Method = "POST"
-               remoteReq.URL = &url.URL{Path: req.URL.Path}
-               remoteParams := make(url.Values)
-               remoteParams.Set("_method", "GET")
-               remoteParams.Set("count", "none")
-               if req.Form.Get("select") != "" {
-                       remoteParams.Set("select", req.Form.Get("select"))
-               }
-               content, err := json.Marshal(uuids)
-               if err != nil {
-                       return nil, "", err
-               }
-               remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
-               enc := remoteParams.Encode()
-               remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
-
-               rc := multiClusterQueryResponseCollector{clusterID: clusterID}
-
-               if clusterID == h.handler.Cluster.ClusterID {
-                       h.handler.localClusterRequest(w, &remoteReq,
-                               rc.collectResponse)
-               } else {
-                       h.handler.remoteClusterRequest(clusterID, w, &remoteReq,
-                               rc.collectResponse)
-               }
-               if rc.error != nil {
-                       return nil, "", rc.error
-               }
-
-               kind = rc.kind
-
-               if len(rc.responses) == 0 {
-                       // We got zero responses, no point in doing
-                       // another query.
-                       return rp, kind, nil
-               }
-
-               rp = append(rp, rc.responses...)
-
-               // Go through the responses and determine what was
-               // returned.  If there are remaining items, loop
-               // around and do another request with just the
-               // stragglers.
-               for _, i := range rc.responses {
-                       uuid, ok := i["uuid"].(string)
-                       if ok {
-                               found[uuid] = true
-                       }
-               }
-
-               l := []string{}
-               for _, u := range uuids {
-                       if !found[u] {
-                               l = append(l, u)
-                       }
-               }
-               prev_len_uuids = len(uuids)
-               uuids = l
-       }
-
-       return rp, kind, nil
-}
-
-func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
-       req *http.Request, clusterId *string) bool {
-
-       var filters [][]interface{}
-       err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
-       if err != nil {
-               httpserver.Error(w, err.Error(), http.StatusBadRequest)
-               return true
-       }
-
-       // Split the list of uuids by prefix
-       queryClusters := make(map[string][]string)
-       expectCount := 0
-       for _, filter := range filters {
-               if len(filter) != 3 {
-                       return false
-               }
-
-               if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
-                       return false
-               }
-
-               op, ok := filter[1].(string)
-               if !ok {
-                       return false
-               }
-
-               if op == "in" {
-                       if rhs, ok := filter[2].([]interface{}); ok {
-                               for _, i := range rhs {
-                                       if u, ok := i.(string); ok {
-                                               *clusterId = u[0:5]
-                                               queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
-                                               expectCount += 1
-                                       }
-                               }
-                       }
-               } else if op == "=" {
-                       if u, ok := filter[2].(string); ok {
-                               *clusterId = u[0:5]
-                               queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
-                               expectCount += 1
-                       }
-               } else {
-                       return false
-               }
-
-       }
-
-       if len(queryClusters) <= 1 {
-               // Query does not search for uuids across multiple
-               // clusters.
-               return false
-       }
-
-       // Validations
-       count := req.Form.Get("count")
-       if count != "" && count != `none` && count != `"none"` {
-               httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
-               return true
-       }
-       if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
-               httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
-               return true
-       }
-       if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
-               httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
-                       expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
-               return true
-       }
-       if req.Form.Get("select") != "" {
-               foundUUID := false
-               var selects []string
-               err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
-               if err != nil {
-                       httpserver.Error(w, err.Error(), http.StatusBadRequest)
-                       return true
-               }
-
-               for _, r := range selects {
-                       if r == "uuid" {
-                               foundUUID = true
-                               break
-                       }
-               }
-               if !foundUUID {
-                       httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
-                       return true
-               }
-       }
-
-       // Perform concurrent requests to each cluster
-
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-       defer close(sem)
-       wg := sync.WaitGroup{}
-
-       req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
-       mtx := sync.Mutex{}
-       errors := []error{}
-       var completeResponses []map[string]interface{}
-       var kind string
-
-       for k, v := range queryClusters {
-               if len(v) == 0 {
-                       // Nothing to query
-                       continue
-               }
-
-               // blocks until it can put a value into the
-               // channel (which has a max queue capacity)
-               sem <- true
-               wg.Add(1)
-               go func(k string, v []string) {
-                       rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
-                       mtx.Lock()
-                       if err == nil {
-                               completeResponses = append(completeResponses, rp...)
-                               kind = kn
-                       } else {
-                               errors = append(errors, err)
-                       }
-                       mtx.Unlock()
-                       wg.Done()
-                       <-sem
-               }(k, v)
-       }
-       wg.Wait()
-
-       if len(errors) > 0 {
-               var strerr []string
-               for _, e := range errors {
-                       strerr = append(strerr, e.Error())
-               }
-               httpserver.Errors(w, strerr, http.StatusBadGateway)
-               return true
-       }
-
-       w.Header().Set("Content-Type", "application/json")
-       w.WriteHeader(http.StatusOK)
-       itemList := make(map[string]interface{})
-       itemList["items"] = completeResponses
-       itemList["kind"] = kind
-       json.NewEncoder(w).Encode(itemList)
-
-       return true
-}
-
-func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
-       m := h.matcher.FindStringSubmatch(req.URL.Path)
-       clusterId := ""
-
-       if len(m) > 0 && m[2] != "" {
-               clusterId = m[2]
-       }
-
-       // Get form parameters from URL and form body (if POST).
-       if err := loadParamsFromForm(req); err != nil {
-               httpserver.Error(w, err.Error(), http.StatusBadRequest)
-               return
-       }
-
-       // Check if the parameters have an explicit cluster_id
-       if req.Form.Get("cluster_id") != "" {
-               clusterId = req.Form.Get("cluster_id")
-       }
-
-       // Handle the POST-as-GET special case (workaround for large
-       // GET requests that potentially exceed maximum URL length,
-       // like multi-object queries where the filter has 100s of
-       // items)
-       effectiveMethod := req.Method
-       if req.Method == "POST" && req.Form.Get("_method") != "" {
-               effectiveMethod = req.Form.Get("_method")
-       }
-
-       if effectiveMethod == "GET" &&
-               clusterId == "" &&
-               req.Form.Get("filters") != "" &&
-               h.handleMultiClusterQuery(w, req, &clusterId) {
-               return
-       }
-
-       if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
-               h.next.ServeHTTP(w, req)
-       } else {
-               h.handler.remoteClusterRequest(clusterId, w, req, nil)
-       }
-}
-
-type rewriteSignaturesClusterId struct {
-       clusterID  string
-       expectHash string
-}
-
-func (rw rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
-       if requestError != nil {
-               return resp, requestError
-       }
-
-       if resp.StatusCode != 200 {
-               return resp, nil
-       }
-
-       originalBody := resp.Body
-       defer originalBody.Close()
-
-       var col arvados.Collection
-       err = json.NewDecoder(resp.Body).Decode(&col)
-       if err != nil {
-               return nil, err
-       }
-
-       // rewriting signatures will make manifest text 5-10% bigger so calculate
-       // capacity accordingly
-       updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
-
-       hasher := md5.New()
-       mw := io.MultiWriter(hasher, updatedManifest)
-       sz := 0
-
-       scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
-       scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
-       for scanner.Scan() {
-               line := scanner.Text()
-               tokens := strings.Split(line, " ")
-               if len(tokens) < 3 {
-                       return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
-               }
-
-               n, err := mw.Write([]byte(tokens[0]))
-               if err != nil {
-                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-               }
-               sz += n
-               for _, token := range tokens[1:] {
-                       n, err = mw.Write([]byte(" "))
-                       if err != nil {
-                               return nil, fmt.Errorf("Error updating manifest: %v", err)
-                       }
-                       sz += n
-
-                       m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
-                       if m != nil {
-                               // Rewrite the block signature to be a remote signature
-                               _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], rw.clusterID, m[5][2:], m[8])
-                               if err != nil {
-                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-                               }
-
-                               // for hash checking, ignore signatures
-                               n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
-                               if err != nil {
-                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-                               }
-                               sz += n
-                       } else {
-                               n, err = mw.Write([]byte(token))
-                               if err != nil {
-                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-                               }
-                               sz += n
-                       }
-               }
-               n, err = mw.Write([]byte("\n"))
-               if err != nil {
-                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-               }
-               sz += n
-       }
-
-       // Check that expected hash is consistent with
-       // portable_data_hash field of the returned record
-       if rw.expectHash == "" {
-               rw.expectHash = col.PortableDataHash
-       } else if rw.expectHash != col.PortableDataHash {
-               return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", rw.expectHash, col.PortableDataHash)
-       }
-
-       // Certify that the computed hash of the manifest_text matches our expectation
-       sum := hasher.Sum(nil)
-       computedHash := fmt.Sprintf("%x+%v", sum, sz)
-       if computedHash != rw.expectHash {
-               return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, rw.expectHash)
-       }
-
-       col.ManifestText = updatedManifest.String()
-
-       newbody, err := json.Marshal(col)
-       if err != nil {
-               return nil, err
-       }
-
-       buf := bytes.NewBuffer(newbody)
-       resp.Body = ioutil.NopCloser(buf)
-       resp.ContentLength = int64(buf.Len())
-       resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
-
-       return resp, nil
-}
-
-func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
-       if requestError != nil {
-               return resp, requestError
-       }
-
-       if resp.StatusCode == 404 {
-               // Suppress returning this result, because we want to
-               // search the federation.
-               return nil, nil
-       }
-       return resp, nil
-}
-
-type searchRemoteClusterForPDH struct {
-       pdh           string
-       remoteID      string
-       mtx           *sync.Mutex
-       sentResponse  *bool
-       sharedContext *context.Context
-       cancelFunc    func()
-       errors        *[]string
-       statusCode    *int
-}
-
-func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
-       s.mtx.Lock()
-       defer s.mtx.Unlock()
-
-       if *s.sentResponse {
-               // Another request already returned a response
-               return nil, nil
-       }
-
-       if requestError != nil {
-               *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
-               // Record the error and suppress response
-               return nil, nil
-       }
-
-       if resp.StatusCode != 200 {
-               // Suppress returning unsuccessful result.  Maybe
-               // another request will find it.
-               // TODO collect and return error responses.
-               *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
-               if resp.StatusCode != 404 {
-                       // Got a non-404 error response, convert into BadGateway
-                       *s.statusCode = http.StatusBadGateway
-               }
-               return nil, nil
-       }
-
-       s.mtx.Unlock()
-
-       // This reads the response body.  We don't want to hold the
-       // lock while doing this because other remote requests could
-       // also have made it to this point, and we don't want a
-       // slow response holding the lock to block a faster response
-       // that is waiting on the lock.
-       newResponse, err = rewriteSignaturesClusterId{s.remoteID, s.pdh}.rewriteSignatures(resp, nil)
-
-       s.mtx.Lock()
-
-       if *s.sentResponse {
-               // Another request already returned a response
-               return nil, nil
-       }
-
-       if err != nil {
-               // Suppress returning unsuccessful result.  Maybe
-               // another request will be successful.
-               *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
-               return nil, nil
-       }
-
-       // We have a successful response.  Suppress/cancel all the
-       // other requests/responses.
-       *s.sentResponse = true
-       s.cancelFunc()
-
-       return newResponse, nil
-}
-
-func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
-       if req.Method != "GET" {
-               // Only handle GET requests right now
-               h.next.ServeHTTP(w, req)
-               return
-       }
-
-       m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
-       if len(m) != 2 {
-               // Not a collection PDH GET request
-               m = collectionRe.FindStringSubmatch(req.URL.Path)
-               clusterId := ""
-
-               if len(m) > 0 {
-                       clusterId = m[2]
-               }
-
-               if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
-                       // request for remote collection by uuid
-                       h.handler.remoteClusterRequest(clusterId, w, req,
-                               rewriteSignaturesClusterId{clusterId, ""}.rewriteSignatures)
-                       return
-               }
-               // not a collection UUID request, or it is a request
-               // for a local UUID, either way, continue down the
-               // handler stack.
-               h.next.ServeHTTP(w, req)
-               return
-       }
-
-       // Request for collection by PDH.  Search the federation.
-
-       // First, query the local cluster.
-       if h.handler.localClusterRequest(w, req, filterLocalClusterResponse) {
-               return
-       }
-
-       sharedContext, cancelFunc := context.WithCancel(req.Context())
-       defer cancelFunc()
-       req = req.WithContext(sharedContext)
-
-       // Create a goroutine for each cluster in the
-       // RemoteClusters map.  The first valid result gets
-       // returned to the client.  When that happens, all
-       // other outstanding requests are cancelled or
-       // suppressed.
-       sentResponse := false
-       mtx := sync.Mutex{}
-       wg := sync.WaitGroup{}
-       var errors []string
-       var errorCode int = 404
-
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-       defer close(sem)
-       for remoteID := range h.handler.Cluster.RemoteClusters {
-               // blocks until it can put a value into the
-               // channel (which has a max queue capacity)
-               sem <- true
-               if sentResponse {
-                       break
-               }
-               search := &searchRemoteClusterForPDH{m[1], remoteID, &mtx, &sentResponse,
-                       &sharedContext, cancelFunc, &errors, &errorCode}
-               wg.Add(1)
-               go func() {
-                       h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
-                       wg.Done()
-                       <-sem
-               }()
-       }
-       wg.Wait()
-
-       if sentResponse {
-               return
-       }
-
-       // No successful responses, so return the error
-       httpserver.Errors(w, errors, errorCode)
-}
-
 func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
        mux := http.NewServeMux()
-       mux.Handle("/arvados/v1/workflows", &genericFederatedRequestHandler{next, h, wfRe})
-       mux.Handle("/arvados/v1/workflows/", &genericFederatedRequestHandler{next, h, wfRe})
-       mux.Handle("/arvados/v1/containers", &genericFederatedRequestHandler{next, h, containersRe})
-       mux.Handle("/arvados/v1/containers/", &genericFederatedRequestHandler{next, h, containersRe})
-       mux.Handle("/arvados/v1/container_requests", &genericFederatedRequestHandler{next, h, containerRequestsRe})
-       mux.Handle("/arvados/v1/container_requests/", &genericFederatedRequestHandler{next, h, containerRequestsRe})
-       mux.Handle("/arvados/v1/collections", next)
-       mux.Handle("/arvados/v1/collections/", &collectionFederatedRequestHandler{next, h})
+
+       wfHandler := &genericFederatedRequestHandler{next, h, wfRe, nil}
+       containersHandler := &genericFederatedRequestHandler{next, h, containersRe, nil}
+       containerRequestsHandler := &genericFederatedRequestHandler{next, h, containerRequestsRe,
+               []federatedRequestDelegate{remoteContainerRequestCreate}}
+       collectionsRequestsHandler := &genericFederatedRequestHandler{next, h, collectionsRe,
+               []federatedRequestDelegate{fetchRemoteCollectionByUUID, fetchRemoteCollectionByPDH}}
+       linksRequestsHandler := &genericFederatedRequestHandler{next, h, linksRe, nil}
+
+       mux.Handle("/arvados/v1/workflows", wfHandler)
+       mux.Handle("/arvados/v1/workflows/", wfHandler)
+       mux.Handle("/arvados/v1/containers", containersHandler)
+       mux.Handle("/arvados/v1/containers/", containersHandler)
+       mux.Handle("/arvados/v1/container_requests", containerRequestsHandler)
+       mux.Handle("/arvados/v1/container_requests/", containerRequestsHandler)
+       mux.Handle("/arvados/v1/collections", collectionsRequestsHandler)
+       mux.Handle("/arvados/v1/collections/", collectionsRequestsHandler)
+       mux.Handle("/arvados/v1/links", linksRequestsHandler)
+       mux.Handle("/arvados/v1/links/", linksRequestsHandler)
        mux.Handle("/", next)
 
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@@ -718,68 +137,157 @@ type CurrentUser struct {
        UUID          string
 }
 
-func (h *Handler) validateAPItoken(req *http.Request, user *CurrentUser) error {
+// validateAPItoken extracts the token from the provided http request,
+// checks it again api_client_authorizations table in the database,
+// and fills in the token scope and user UUID.  Does not handle remote
+// tokens unless they are already in the database and not expired.
+func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, error) {
+       user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
        db, err := h.db(req)
        if err != nil {
-               return err
+               return nil, err
+       }
+
+       var uuid string
+       if strings.HasPrefix(token, "v2/") {
+               sp := strings.Split(token, "/")
+               uuid = sp[1]
+               token = sp[2]
+       }
+       user.Authorization.APIToken = token
+       var scopes string
+       err = db.QueryRowContext(req.Context(), `SELECT api_client_authorizations.uuid, api_client_authorizations.scopes, users.uuid FROM api_client_authorizations JOIN users on api_client_authorizations.user_id=users.id WHERE api_token=$1 AND (expires_at IS NULL OR expires_at > current_timestamp) LIMIT 1`, token).Scan(&user.Authorization.UUID, &scopes, &user.UUID)
+       if err != nil {
+               return nil, err
+       }
+       if uuid != "" && user.Authorization.UUID != uuid {
+               return nil, fmt.Errorf("UUID embedded in v2 token did not match record")
+       }
+       err = json.Unmarshal([]byte(scopes), &user.Authorization.Scopes)
+       if err != nil {
+               return nil, err
+       }
+       return &user, nil
+}
+
+func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
+       db, err := h.db(req)
+       if err != nil {
+               return nil, err
+       }
+       rd, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
+       if err != nil {
+               return nil, err
+       }
+       uuid := fmt.Sprintf("%v-gj3su-%v", h.Cluster.ClusterID, rd)
+       token, err := randutil.String(50, "abcdefghijklmnopqrstuvwxyz0123456789")
+       if err != nil {
+               return nil, err
+       }
+       if len(scopes) == 0 {
+               scopes = append(scopes, "all")
+       }
+       scopesjson, err := json.Marshal(scopes)
+       if err != nil {
+               return nil, err
+       }
+       _, err = db.ExecContext(req.Context(),
+               `INSERT INTO api_client_authorizations
+(uuid, api_token, expires_at, scopes,
+user_id,
+api_client_id, created_at, updated_at)
+VALUES ($1, $2, CURRENT_TIMESTAMP + INTERVAL '2 weeks', $3,
+(SELECT id FROM users WHERE users.uuid=$4 LIMIT 1),
+0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`,
+               uuid, token, string(scopesjson), userUUID)
+
+       if err != nil {
+               return nil, err
        }
-       return db.QueryRowContext(req.Context(), `SELECT api_client_authorizations.uuid, users.uuid FROM api_client_authorizations JOIN users on api_client_authorizations.user_id=users.id WHERE api_token=$1 AND (expires_at IS NULL OR expires_at > current_timestamp) LIMIT 1`, user.Authorization.APIToken).Scan(&user.Authorization.UUID, &user.UUID)
+
+       return &arvados.APIClientAuthorization{
+               UUID:      uuid,
+               APIToken:  token,
+               ExpiresAt: "",
+               Scopes:    scopes}, nil
 }
 
 // Extract the auth token supplied in req, and replace it with a
 // salted token for the remote cluster.
-func (h *Handler) saltAuthToken(req *http.Request, remote string) error {
+func (h *Handler) saltAuthToken(req *http.Request, remote string) (updatedReq *http.Request, err error) {
+       updatedReq = (&http.Request{
+               Method:        req.Method,
+               URL:           req.URL,
+               Header:        req.Header,
+               Body:          req.Body,
+               ContentLength: req.ContentLength,
+               Host:          req.Host,
+       }).WithContext(req.Context())
+
        creds := auth.NewCredentials()
-       creds.LoadTokensFromHTTPRequest(req)
-       if len(creds.Tokens) == 0 && req.Header.Get("Content-Type") == "application/x-www-form-encoded" {
+       creds.LoadTokensFromHTTPRequest(updatedReq)
+       if len(creds.Tokens) == 0 && updatedReq.Header.Get("Content-Type") == "application/x-www-form-encoded" {
                // Override ParseForm's 10MiB limit by ensuring
                // req.Body is a *http.maxBytesReader.
-               req.Body = http.MaxBytesReader(nil, req.Body, 1<<28) // 256MiB. TODO: use MaxRequestSize from discovery doc or config.
-               if err := creds.LoadTokensFromHTTPRequestBody(req); err != nil {
-                       return err
+               updatedReq.Body = http.MaxBytesReader(nil, updatedReq.Body, 1<<28) // 256MiB. TODO: use MaxRequestSize from discovery doc or config.
+               if err := creds.LoadTokensFromHTTPRequestBody(updatedReq); err != nil {
+                       return nil, err
                }
                // Replace req.Body with a buffer that re-encodes the
                // form without api_token, in case we end up
                // forwarding the request.
-               if req.PostForm != nil {
-                       req.PostForm.Del("api_token")
+               if updatedReq.PostForm != nil {
+                       updatedReq.PostForm.Del("api_token")
                }
-               req.Body = ioutil.NopCloser(bytes.NewBufferString(req.PostForm.Encode()))
+               updatedReq.Body = ioutil.NopCloser(bytes.NewBufferString(updatedReq.PostForm.Encode()))
        }
        if len(creds.Tokens) == 0 {
-               return nil
+               return updatedReq, nil
        }
+
        token, err := auth.SaltToken(creds.Tokens[0], remote)
+
        if err == auth.ErrObsoleteToken {
                // If the token exists in our own database, salt it
                // for the remote. Otherwise, assume it was issued by
                // the remote, and pass it through unmodified.
-               currentUser := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: creds.Tokens[0]}}
-               err = h.validateAPItoken(req, &currentUser)
+               currentUser, err := h.validateAPItoken(req, creds.Tokens[0])
                if err == sql.ErrNoRows {
                        // Not ours; pass through unmodified.
-                       token = currentUser.Authorization.APIToken
+                       token = creds.Tokens[0]
                } else if err != nil {
-                       return err
+                       return nil, err
                } else {
                        // Found; make V2 version and salt it.
                        token, err = auth.SaltToken(currentUser.Authorization.TokenV2(), remote)
                        if err != nil {
-                               return err
+                               return nil, err
                        }
                }
        } else if err != nil {
-               return err
+               return nil, err
+       }
+       updatedReq.Header = http.Header{}
+       for k, v := range req.Header {
+               if k != "Authorization" {
+                       updatedReq.Header[k] = v
+               }
        }
-       req.Header.Set("Authorization", "Bearer "+token)
+       updatedReq.Header.Set("Authorization", "Bearer "+token)
 
        // Remove api_token=... from the the query string, in case we
        // end up forwarding the request.
-       if values, err := url.ParseQuery(req.URL.RawQuery); err != nil {
-               return err
+       if values, err := url.ParseQuery(updatedReq.URL.RawQuery); err != nil {
+               return nil, err
        } else if _, ok := values["api_token"]; ok {
                delete(values, "api_token")
-               req.URL.RawQuery = values.Encode()
+               updatedReq.URL = &url.URL{
+                       Scheme:   req.URL.Scheme,
+                       Host:     req.URL.Host,
+                       Path:     req.URL.Path,
+                       RawPath:  req.URL.RawPath,
+                       RawQuery: values.Encode(),
+               }
        }
-       return nil
+       return updatedReq, nil
 }
index 23d5d7ca768111efd050861757e07879f91d7b05..db39029bed532919c6efab0c357df4545d08f21a 100644 (file)
@@ -5,8 +5,10 @@
 package controller
 
 import (
+       "bytes"
        "encoding/json"
        "fmt"
+       "io"
        "io/ioutil"
        "net/http"
        "net/http/httptest"
@@ -90,6 +92,10 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
 }
 
 func (s *FederationSuite) remoteMockHandler(w http.ResponseWriter, req *http.Request) {
+       b := &bytes.Buffer{}
+       io.Copy(b, req.Body)
+       req.Body.Close()
+       req.Body = ioutil.NopCloser(b)
        s.remoteMockRequests = append(s.remoteMockRequests, *req)
 }
 
@@ -341,6 +347,8 @@ func (s *FederationSuite) TestGetLocalCollection(c *check.C) {
        s.testHandler.Cluster.NodeProfiles["*"] = np
        s.testHandler.NodeProfile = &np
 
+       // HTTP GET
+
        req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementCollection, nil)
        req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
        resp := s.testRequest(req)
@@ -352,6 +360,23 @@ func (s *FederationSuite) TestGetLocalCollection(c *check.C) {
        c.Check(col.ManifestText, check.Matches,
                `\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
 `)
+
+       // HTTP POST with _method=GET as a form parameter
+
+       req = httptest.NewRequest("POST", "/arvados/v1/collections/"+arvadostest.UserAgreementCollection, bytes.NewBufferString((url.Values{
+               "_method": {"GET"},
+       }).Encode()))
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
+       resp = s.testRequest(req)
+
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       col = arvados.Collection{}
+       c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+       c.Check(col.UUID, check.Equals, arvadostest.UserAgreementCollection)
+       c.Check(col.ManifestText, check.Matches,
+               `\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
 }
 
 func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
@@ -567,6 +592,104 @@ func (s *FederationSuite) TestCreateRemoteContainerRequest(c *check.C) {
        c.Check(strings.HasPrefix(cr.UUID, "zzzzz-"), check.Equals, true)
 }
 
+func (s *FederationSuite) TestCreateRemoteContainerRequestCheckRuntimeToken(c *check.C) {
+       // Send request to zmock and check that outgoing request has
+       // runtime_token set with a new random v2 token.
+
+       defer s.localServiceReturns404(c).Close()
+       // pass cluster_id via query parameter, this allows arvados-controller
+       // to avoid parsing the body
+       req := httptest.NewRequest("POST", "/arvados/v1/container_requests?cluster_id=zmock",
+               strings.NewReader(`{
+  "container_request": {
+    "name": "hello world",
+    "state": "Uncommitted",
+    "output_path": "/",
+    "container_image": "123",
+    "command": ["abc"]
+  }
+}
+`))
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2)
+       req.Header.Set("Content-type", "application/json")
+
+       np := arvados.NodeProfile{
+               Controller: arvados.SystemServiceInstance{Listen: ":"},
+               RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+                       TLS: true, Insecure: true}}
+       s.testHandler.Cluster.ClusterID = "zzzzz"
+       s.testHandler.Cluster.NodeProfiles["*"] = np
+       s.testHandler.NodeProfile = &np
+
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var cr struct {
+               arvados.ContainerRequest `json:"container_request"`
+       }
+       c.Check(json.NewDecoder(s.remoteMockRequests[0].Body).Decode(&cr), check.IsNil)
+       c.Check(strings.HasPrefix(cr.ContainerRequest.RuntimeToken, "v2/zzzzz-gj3su-"), check.Equals, true)
+       c.Check(cr.ContainerRequest.RuntimeToken, check.Not(check.Equals), arvadostest.ActiveTokenV2)
+}
+
+func (s *FederationSuite) TestCreateRemoteContainerRequestCheckSetRuntimeToken(c *check.C) {
+       // Send request to zmock and check that outgoing request has
+       // runtime_token set with the explicitly provided token.
+
+       defer s.localServiceReturns404(c).Close()
+       // pass cluster_id via query parameter, this allows arvados-controller
+       // to avoid parsing the body
+       req := httptest.NewRequest("POST", "/arvados/v1/container_requests?cluster_id=zmock",
+               strings.NewReader(`{
+  "container_request": {
+    "name": "hello world",
+    "state": "Uncommitted",
+    "output_path": "/",
+    "container_image": "123",
+    "command": ["abc"],
+    "runtime_token": "xyz"
+  }
+}
+`))
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       req.Header.Set("Content-type", "application/json")
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var cr struct {
+               arvados.ContainerRequest `json:"container_request"`
+       }
+       c.Check(json.NewDecoder(s.remoteMockRequests[0].Body).Decode(&cr), check.IsNil)
+       c.Check(cr.ContainerRequest.RuntimeToken, check.Equals, "xyz")
+}
+
+func (s *FederationSuite) TestCreateRemoteContainerRequestRuntimeTokenFromAuth(c *check.C) {
+       // Send request to zmock and check that outgoing request has
+       // runtime_token set using the Auth token because the user is remote.
+
+       defer s.localServiceReturns404(c).Close()
+       // pass cluster_id via query parameter, this allows arvados-controller
+       // to avoid parsing the body
+       req := httptest.NewRequest("POST", "/arvados/v1/container_requests?cluster_id=zmock",
+               strings.NewReader(`{
+  "container_request": {
+    "name": "hello world",
+    "state": "Uncommitted",
+    "output_path": "/",
+    "container_image": "123",
+    "command": ["abc"]
+  }
+}
+`))
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2+"/zzzzz-dz642-parentcontainer")
+       req.Header.Set("Content-type", "application/json")
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var cr struct {
+               arvados.ContainerRequest `json:"container_request"`
+       }
+       c.Check(json.NewDecoder(s.remoteMockRequests[0].Body).Decode(&cr), check.IsNil)
+       c.Check(cr.ContainerRequest.RuntimeToken, check.Equals, arvadostest.ActiveTokenV2)
+}
+
 func (s *FederationSuite) TestCreateRemoteContainerRequestError(c *check.C) {
        defer s.localServiceReturns404(c).Close()
        // pass cluster_id via query parameter, this allows arvados-controller
index 0c31815cba21f2869e7ae4ddf73c880bf4d0a5c8..295dde7ca42821b1c8f904eec42ac7e7764812fa 100644 (file)
@@ -5,6 +5,7 @@
 package controller
 
 import (
+       "context"
        "database/sql"
        "errors"
        "net"
@@ -49,6 +50,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
                        req.URL.Path = strings.Replace(req.URL.Path, "//", "/", -1)
                }
        }
+       if h.Cluster.HTTPRequestTimeout > 0 {
+               ctx, cancel := context.WithDeadline(req.Context(), time.Now().Add(time.Duration(h.Cluster.HTTPRequestTimeout)))
+               req = req.WithContext(ctx)
+               defer cancel()
+       }
+
        h.handlerStack.ServeHTTP(w, req)
 }
 
@@ -83,8 +90,7 @@ func (h *Handler) setup() {
        h.insecureClient = &ic
 
        h.proxy = &proxy{
-               Name:           "arvados-controller",
-               RequestTimeout: time.Duration(h.Cluster.HTTPRequestTimeout),
+               Name: "arvados-controller",
        }
 }
 
@@ -121,14 +127,10 @@ func prepend(next http.Handler, middleware middlewareFunc) http.Handler {
        })
 }
 
-// localClusterRequest sets up a request so it can be proxied to the
-// local API server using proxy.Do().  Returns true if a response was
-// written, false if not.
-func (h *Handler) localClusterRequest(w http.ResponseWriter, req *http.Request, filter ResponseFilter) bool {
+func (h *Handler) localClusterRequest(req *http.Request) (*http.Response, error) {
        urlOut, insecure, err := findRailsAPI(h.Cluster, h.NodeProfile)
        if err != nil {
-               httpserver.Error(w, err.Error(), http.StatusInternalServerError)
-               return true
+               return nil, err
        }
        urlOut = &url.URL{
                Scheme:   urlOut.Scheme,
@@ -141,12 +143,14 @@ func (h *Handler) localClusterRequest(w http.ResponseWriter, req *http.Request,
        if insecure {
                client = h.insecureClient
        }
-       return h.proxy.Do(w, req, urlOut, client, filter)
+       return h.proxy.Do(req, urlOut, client)
 }
 
 func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
-       if !h.localClusterRequest(w, req, nil) && next != nil {
-               next.ServeHTTP(w, req)
+       resp, err := h.localClusterRequest(req)
+       n, err := h.proxy.ForwardResponse(w, resp, err)
+       if err != nil {
+               httpserver.Logger(req).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
        }
 }
 
index 963fd1159415e16d93e676401021e974c287ad69..746b9242f2198ee3c3000c808771047d4aa1c77c 100644 (file)
@@ -130,3 +130,39 @@ func (s *HandlerSuite) TestProxyRedirect(c *check.C) {
        c.Check(resp.Code, check.Equals, http.StatusFound)
        c.Check(resp.Header().Get("Location"), check.Matches, `https://0.0.0.0:1/auth/joshid\?return_to=foo&?`)
 }
+
+func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
+       user, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveToken)
+       c.Assert(err, check.IsNil)
+       c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
+       c.Check(user.Authorization.APIToken, check.Equals, arvadostest.ActiveToken)
+       c.Check(user.Authorization.Scopes, check.DeepEquals, []string{"all"})
+       c.Check(user.UUID, check.Equals, arvadostest.ActiveUserUUID)
+}
+
+func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
+       user, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveTokenV2)
+       c.Assert(err, check.IsNil)
+       c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
+       c.Check(user.Authorization.APIToken, check.Equals, arvadostest.ActiveToken)
+       c.Check(user.Authorization.Scopes, check.DeepEquals, []string{"all"})
+       c.Check(user.UUID, check.Equals, arvadostest.ActiveUserUUID)
+       c.Check(user.Authorization.TokenV2(), check.Equals, arvadostest.ActiveTokenV2)
+}
+
+func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
+       auth, err := s.handler.(*Handler).createAPItoken(req, arvadostest.ActiveUserUUID, nil)
+       c.Assert(err, check.IsNil)
+       c.Check(auth.Scopes, check.DeepEquals, []string{"all"})
+
+       user, err := s.handler.(*Handler).validateAPItoken(req, auth.TokenV2())
+       c.Assert(err, check.IsNil)
+       c.Check(user.Authorization.UUID, check.Equals, auth.UUID)
+       c.Check(user.Authorization.APIToken, check.Equals, auth.APIToken)
+       c.Check(user.Authorization.Scopes, check.DeepEquals, []string{"all"})
+       c.Check(user.UUID, check.Equals, arvadostest.ActiveUserUUID)
+       c.Check(user.Authorization.TokenV2(), check.Equals, auth.TokenV2())
+}
index 951cb9d25fe24ba74a5697d54187847cfc84ae1a..c01c152352e6b8f101179bf38add3b0574a00c5d 100644 (file)
@@ -5,18 +5,24 @@
 package controller
 
 import (
-       "context"
        "io"
        "net/http"
        "net/url"
-       "time"
 
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
 )
 
 type proxy struct {
-       Name           string // to use in Via header
-       RequestTimeout time.Duration
+       Name string // to use in Via header
+}
+
+type HTTPError struct {
+       Message string
+       Code    int
+}
+
+func (h HTTPError) Error() string {
+       return h.Message
 }
 
 // headers that shouldn't be forwarded when proxying. See
@@ -36,15 +42,11 @@ var dropHeaders = map[string]bool{
 
 type ResponseFilter func(*http.Response, error) (*http.Response, error)
 
-// Do sends a request, passes the result to the filter (if provided)
-// and then if the result is not suppressed by the filter, sends the
-// request to the ResponseWriter.  Returns true if a response was written,
-// false if not.
-func (p *proxy) Do(w http.ResponseWriter,
+// Forward a request to upstream service, and return response or error.
+func (p *proxy) Do(
        reqIn *http.Request,
        urlOut *url.URL,
-       client *http.Client,
-       filter ResponseFilter) bool {
+       client *http.Client) (*http.Response, error) {
 
        // Copy headers from incoming request, then add/replace proxy
        // headers like Via and X-Forwarded-For.
@@ -64,65 +66,35 @@ func (p *proxy) Do(w http.ResponseWriter,
        }
        hdrOut.Add("Via", reqIn.Proto+" arvados-controller")
 
-       ctx := reqIn.Context()
-       if p.RequestTimeout > 0 {
-               var cancel context.CancelFunc
-               ctx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(p.RequestTimeout)))
-               defer cancel()
-       }
-
        reqOut := (&http.Request{
                Method: reqIn.Method,
                URL:    urlOut,
                Host:   reqIn.Host,
                Header: hdrOut,
                Body:   reqIn.Body,
-       }).WithContext(ctx)
+       }).WithContext(reqIn.Context())
 
        resp, err := client.Do(reqOut)
-       if filter == nil && err != nil {
-               httpserver.Error(w, err.Error(), http.StatusBadGateway)
-               return true
-       }
-
-       // make sure original response body gets closed
-       var originalBody io.ReadCloser
-       if resp != nil {
-               originalBody = resp.Body
-               if originalBody != nil {
-                       defer originalBody.Close()
-               }
-       }
-
-       if filter != nil {
-               resp, err = filter(resp, err)
+       return resp, err
+}
 
-               if err != nil {
+// Copy a response (or error) to the downstream client
+func (p *proxy) ForwardResponse(w http.ResponseWriter, resp *http.Response, err error) (int64, error) {
+       if err != nil {
+               if he, ok := err.(HTTPError); ok {
+                       httpserver.Error(w, he.Message, he.Code)
+               } else {
                        httpserver.Error(w, err.Error(), http.StatusBadGateway)
-                       return true
-               }
-               if resp == nil {
-                       // filter() returned a nil response, this means suppress
-                       // writing a response, for the case where there might
-                       // be multiple response writers.
-                       return false
-               }
-
-               // the filter gave us a new response body, make sure that gets closed too.
-               if resp.Body != originalBody {
-                       defer resp.Body.Close()
                }
+               return 0, nil
        }
 
+       defer resp.Body.Close()
        for k, v := range resp.Header {
                for _, v := range v {
                        w.Header().Add(k, v)
                }
        }
        w.WriteHeader(resp.StatusCode)
-       n, err := io.Copy(w, resp.Body)
-       if err != nil {
-               httpserver.Logger(reqIn).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
-       }
-       return true
+       return io.Copy(w, resp.Body)
 }
index ec0239eb37bf0a45bb715b35eab757c6c94850d5..17cff235db82fba55fa12c6ff08fe0a114dff27b 100644 (file)
@@ -6,8 +6,10 @@ package arvados
 
 // APIClientAuthorization is an arvados#apiClientAuthorization resource.
 type APIClientAuthorization struct {
-       UUID     string `json:"uuid"`
-       APIToken string `json:"api_token"`
+       UUID      string   `json:"uuid,omitempty"`
+       APIToken  string   `json:"api_token,omitempty"`
+       ExpiresAt string   `json:"expires_at,omitempty"`
+       Scopes    []string `json:"scopes,omitempty"`
 }
 
 // APIClientAuthorizationList is an arvados#apiClientAuthorizationList resource.
index def4e33cbb5f771cfbe6edbcec0d9704bab518af..02a0d76decbad272baee737282b5087a72a33c60 100644 (file)
@@ -57,6 +57,7 @@ type ContainerRequest struct {
        UseExisting             bool                   `json:"use_existing"`
        LogUUID                 string                 `json:"log_uuid"`
        OutputUUID              string                 `json:"output_uuid"`
+       RuntimeToken            string                 `json:"runtime_token"`
 }
 
 // Mount is special behavior to attach to a filesystem path or device.
index 301f0b48bede74b63f1141b7b295e346824c3888..9ae0fc3a5f4dc2a1e674325b5c4d9f86f19e5afa 100644 (file)
@@ -21,6 +21,7 @@ type keepBackend struct {
 type keepClient interface {
        ReadAt(locator string, p []byte, off int) (int, error)
        PutB(p []byte) (string, int, error)
+       LocalLocator(locator string) (string, error)
 }
 
 type apiClient interface {
index f6afadba5b47488dfff41a53ebca31a97e1dc940..b996542abd52cf7be04549962fdb31dfb7a366a0 100644 (file)
@@ -8,7 +8,6 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "log"
        "os"
        "path"
        "regexp"
@@ -116,14 +115,12 @@ func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime t
 }
 
 func (fs *collectionFileSystem) Sync() error {
-       log.Printf("cfs.Sync()")
        if fs.uuid == "" {
                return nil
        }
        txt, err := fs.MarshalManifest(".")
        if err != nil {
-               log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
-               return err
+               return fmt.Errorf("sync failed: %s", err)
        }
        coll := &Collection{
                UUID:         fs.uuid,
@@ -131,9 +128,9 @@ func (fs *collectionFileSystem) Sync() error {
        }
        err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, fs.UpdateBody(coll), map[string]interface{}{"select": []string{"uuid"}})
        if err != nil {
-               log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
+               return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
        }
-       return err
+       return nil
 }
 
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
@@ -549,9 +546,10 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode
        return dn.treenode.Child(name, replace)
 }
 
-// sync flushes in-memory data (for the children with the given names,
-// which must be children of dn) to persistent storage. Caller must
-// have write lock on dn and the named children.
+// sync flushes in-memory data and remote block references (for the
+// children with the given names, which must be children of dn) to
+// local persistent storage. Caller must have write lock on dn and the
+// named children.
 func (dn *dirnode) sync(names []string) error {
        type shortBlock struct {
                fn  *filenode
@@ -588,37 +586,51 @@ func (dn *dirnode) sync(names []string) error {
                return nil
        }
 
+       localLocator := map[string]string{}
        for _, name := range names {
                fn, ok := dn.inodes[name].(*filenode)
                if !ok {
                        continue
                }
                for idx, seg := range fn.segments {
-                       seg, ok := seg.(*memSegment)
-                       if !ok {
-                               continue
-                       }
-                       if seg.Len() > maxBlockSize/2 {
-                               if err := flush([]shortBlock{{fn, idx}}); err != nil {
-                                       return err
+                       switch seg := seg.(type) {
+                       case storedSegment:
+                               loc, ok := localLocator[seg.locator]
+                               if !ok {
+                                       var err error
+                                       loc, err = dn.fs.LocalLocator(seg.locator)
+                                       if err != nil {
+                                               return err
+                                       }
+                                       localLocator[seg.locator] = loc
                                }
-                               continue
-                       }
-                       if pendingLen+seg.Len() > maxBlockSize {
-                               if err := flush(pending); err != nil {
-                                       return err
+                               seg.locator = loc
+                               fn.segments[idx] = seg
+                       case *memSegment:
+                               if seg.Len() > maxBlockSize/2 {
+                                       if err := flush([]shortBlock{{fn, idx}}); err != nil {
+                                               return err
+                                       }
+                                       continue
+                               }
+                               if pendingLen+seg.Len() > maxBlockSize {
+                                       if err := flush(pending); err != nil {
+                                               return err
+                                       }
+                                       pending = nil
+                                       pendingLen = 0
                                }
-                               pending = nil
-                               pendingLen = 0
+                               pending = append(pending, shortBlock{fn, idx})
+                               pendingLen += seg.Len()
+                       default:
+                               panic(fmt.Sprintf("can't sync segment type %T", seg))
                        }
-                       pending = append(pending, shortBlock{fn, idx})
-                       pendingLen += seg.Len()
                }
        }
        return flush(pending)
 }
 
-// caller must have read lock.
+// caller must have write lock.
 func (dn *dirnode) marshalManifest(prefix string) (string, error) {
        var streamLen int64
        type filepart struct {
@@ -630,6 +642,17 @@ func (dn *dirnode) marshalManifest(prefix string) (string, error) {
        var subdirs string
        var blocks []string
 
+       if len(dn.inodes) == 0 {
+               if prefix == "." {
+                       return "", nil
+               }
+               // Express the existence of an empty directory by
+               // adding an empty file named `\056`, which (unlike
+               // the more obvious spelling `.`) is accepted by the
+               // API's manifest validator.
+               return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
+       }
+
        names := make([]string, 0, len(dn.inodes))
        for name := range dn.inodes {
                names = append(names, name)
@@ -758,8 +781,14 @@ func (dn *dirnode) loadManifest(txt string) error {
                        }
                        name := dirname + "/" + manifestUnescape(toks[2])
                        fnode, err := dn.createFileAndParents(name)
-                       if err != nil {
-                               return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+                       if fnode == nil && err == nil && length == 0 {
+                               // Special case: an empty file used as
+                               // a marker to preserve an otherwise
+                               // empty directory in a manifest.
+                               continue
+                       }
+                       if err != nil || (fnode == nil && length != 0) {
+                               return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
                        }
                        // Map the stream offset/range coordinates to
                        // block/offset/range coordinates and add
@@ -817,15 +846,14 @@ func (dn *dirnode) loadManifest(txt string) error {
        return nil
 }
 
-// only safe to call from loadManifest -- no locking
+// only safe to call from loadManifest -- no locking.
+//
+// If path is a "parent directory exists" marker (the last path
+// component is "."), the returned values are both nil.
 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
        var node inode = dn
        names := strings.Split(path, "/")
        basename := names[len(names)-1]
-       if !permittedName(basename) {
-               err = fmt.Errorf("invalid file part %q in path %q", basename, path)
-               return
-       }
        for _, name := range names[:len(names)-1] {
                switch name {
                case "", ".":
@@ -856,6 +884,12 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
                        return
                }
        }
+       if basename == "." {
+               return
+       } else if !permittedName(basename) {
+               err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+               return
+       }
        _, err = node.Child(basename, func(child inode) (inode, error) {
                switch child := child.(type) {
                case nil:
index 96347737f8f24dce8c56518cf28a106069ef4a28..a6d4ab1e5b71baccabafdbdf810db0ee264420a5 100644 (file)
@@ -7,6 +7,7 @@ package arvados
 import (
        "bytes"
        "crypto/md5"
+       "crypto/sha1"
        "errors"
        "fmt"
        "io"
@@ -16,6 +17,7 @@ import (
        "os"
        "regexp"
        "runtime"
+       "strings"
        "sync"
        "testing"
        "time"
@@ -27,7 +29,8 @@ import (
 var _ = check.Suite(&CollectionFSSuite{})
 
 type keepClientStub struct {
-       blocks map[string][]byte
+       blocks      map[string][]byte
+       refreshable map[string]bool
        sync.RWMutex
 }
 
@@ -53,11 +56,28 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
        return locator, 1, nil
 }
 
+var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+
+func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+       kcs.Lock()
+       defer kcs.Unlock()
+       if strings.Contains(locator, "+R") {
+               if len(locator) < 32 {
+                       return "", fmt.Errorf("bad locator: %q", locator)
+               }
+               if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
+                       return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
+               }
+       }
+       fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
+       return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+}
+
 type CollectionFSSuite struct {
        client *Client
        coll   Collection
        fs     CollectionFileSystem
-       kc     keepClient
+       kc     *keepClientStub
 }
 
 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
@@ -399,6 +419,37 @@ func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
        checkSize(11)
 }
 
+func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
+       foo := "foo"
+       bar := "bar"
+       hash := map[string]string{
+               foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
+               bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
+       }
+
+       fs, err := (&Collection{
+               ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
+       }).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       manifest, err := fs.MarshalManifest(".")
+       c.Check(manifest, check.Equals, "")
+       c.Check(err, check.NotNil)
+
+       s.kc.refreshable = map[string]bool{hash[bar]: true}
+
+       for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
+               fs, err = (&Collection{
+                       ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
+               }).FileSystem(s.client, s.kc)
+               c.Assert(err, check.IsNil)
+               manifest, err := fs.MarshalManifest(".")
+               c.Check(err, check.IsNil)
+               // Both blocks should now have +A signatures.
+               c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
+               c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
+       }
+}
+
 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
        maxBlockSize = 8
        defer func() { maxBlockSize = 2 << 26 }()
@@ -490,7 +541,7 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
                        f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
-                       for i := 0; i < 6502; i++ {
+                       for i := 0; i < 1024; i++ {
                                r := rand.Uint32()
                                switch {
                                case r%11 == 0:
@@ -641,6 +692,25 @@ func (s *CollectionFSSuite) TestRenameError(c *check.C) {
        c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
 }
 
+func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("foo", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("bar", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Rename("bar", "baz")
+       c.Check(err, check.IsNil)
+       err = fs.Rename("foo", "baz")
+       c.Check(err, check.NotNil)
+       err = fs.Rename("foo", "baz/")
+       c.Check(err, check.IsNil)
+       err = fs.Rename("baz/foo", ".")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+       err = fs.Rename("baz/foo/", ".")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+}
+
 func (s *CollectionFSSuite) TestRename(c *check.C) {
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
@@ -791,11 +861,11 @@ func (s *CollectionFSSuite) TestPersist(c *check.C) {
        }
 }
 
-func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
+func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
        var err error
        s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
-       for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} {
+       for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
                err = s.fs.Mkdir(name, 0755)
                c.Assert(err, check.IsNil)
        }
@@ -841,6 +911,23 @@ func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
                c.Check(err, check.IsNil)
                c.Check(buf, check.DeepEquals, data)
        }
+
+       expectDir := map[string]int{
+               "empty":           0,
+               "not empty":       1,
+               "not empty/empty": 0,
+       }
+       for name, expectLen := range expectDir {
+               _, err := persisted.Open(name + "/bogus")
+               c.Check(err, check.NotNil)
+
+               d, err := persisted.Open(name)
+               defer d.Close()
+               c.Check(err, check.IsNil)
+               fi, err := d.Readdir(-1)
+               c.Check(err, check.IsNil)
+               c.Check(fi, check.HasLen, expectLen)
+       }
 }
 
 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
@@ -992,6 +1079,12 @@ func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
                ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
                ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
                ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
                "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
        } {
@@ -1007,7 +1100,9 @@ func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
        for _, txt := range []string{
                "",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
-               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
        } {
index 114faf17b74e245aeaacf72aeaaf5bb6f8e5046a..e0f2483131a98a64856116bda8c14b4de7bd7051 100644 (file)
@@ -8,6 +8,7 @@ package arvadostest
 const (
        SpectatorToken          = "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu"
        ActiveToken             = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+       ActiveTokenUUID         = "zzzzz-gj3su-077z32aux8dg2s1"
        ActiveTokenV2           = "v2/zzzzz-gj3su-077z32aux8dg2s1/3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
        AdminToken              = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
        AnonymousToken          = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
index 6452136d85eede6896f1dca1648e00b4ba6ae8e7..14d89873b60f7d902a39a6b337eea78e8040d0c3 100644 (file)
@@ -12,6 +12,10 @@ import (
        "time"
 )
 
+const (
+       HeaderRequestID = "X-Request-Id"
+)
+
 // IDGenerator generates alphanumeric strings suitable for use as
 // unique IDs (a given IDGenerator will never return the same ID
 // twice).
@@ -44,11 +48,11 @@ func (g *IDGenerator) Next() string {
 func AddRequestIDs(h http.Handler) http.Handler {
        gen := &IDGenerator{Prefix: "req-"}
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
-               if req.Header.Get("X-Request-Id") == "" {
+               if req.Header.Get(HeaderRequestID) == "" {
                        if req.Header == nil {
                                req.Header = http.Header{}
                        }
-                       req.Header.Set("X-Request-Id", gen.Next())
+                       req.Header.Set(HeaderRequestID, gen.Next())
                }
                h.ServeHTTP(w, req)
        })
index 169f1457e2e06e6e3424856809c92fc5dc74d4f9..ab610d65e71453ba8abc2287c5321a6a41dee217 100644 (file)
@@ -198,9 +198,9 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error
        }
 }
 
-func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) {
+func (kc *KeepClient) getOrHead(method string, locator string, header http.Header) (io.ReadCloser, int64, string, http.Header, error) {
        if strings.HasPrefix(locator, "d41d8cd98f00b204e9800998ecf8427e+0") {
-               return ioutil.NopCloser(bytes.NewReader(nil)), 0, "", nil
+               return ioutil.NopCloser(bytes.NewReader(nil)), 0, "", nil, nil
        }
 
        reqid := kc.getRequestID()
@@ -237,8 +237,15 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i
                                errs = append(errs, fmt.Sprintf("%s: %v", url, err))
                                continue
                        }
-                       req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
-                       req.Header.Add("X-Request-Id", reqid)
+                       for k, v := range header {
+                               req.Header[k] = append([]string(nil), v...)
+                       }
+                       if req.Header.Get("Authorization") == "" {
+                               req.Header.Set("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
+                       }
+                       if req.Header.Get("X-Request-Id") == "" {
+                               req.Header.Set("X-Request-Id", reqid)
+                       }
                        resp, err := kc.httpClient().Do(req)
                        if err != nil {
                                // Probably a network error, may be transient,
@@ -269,12 +276,12 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i
                        if expectLength < 0 {
                                if resp.ContentLength < 0 {
                                        resp.Body.Close()
-                                       return nil, 0, "", fmt.Errorf("error reading %q: no size hint, no Content-Length header in response", locator)
+                                       return nil, 0, "", nil, fmt.Errorf("error reading %q: no size hint, no Content-Length header in response", locator)
                                }
                                expectLength = resp.ContentLength
                        } else if resp.ContentLength >= 0 && expectLength != resp.ContentLength {
                                resp.Body.Close()
-                               return nil, 0, "", fmt.Errorf("error reading %q: size hint %d != Content-Length %d", locator, expectLength, resp.ContentLength)
+                               return nil, 0, "", nil, fmt.Errorf("error reading %q: size hint %d != Content-Length %d", locator, expectLength, resp.ContentLength)
                        }
                        // Success
                        if method == "GET" {
@@ -282,10 +289,10 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i
                                        Reader: resp.Body,
                                        Hash:   md5.New(),
                                        Check:  locator[0:32],
-                               }, expectLength, url, nil
+                               }, expectLength, url, resp.Header, nil
                        } else {
                                resp.Body.Close()
-                               return nil, expectLength, url, nil
+                               return nil, expectLength, url, resp.Header, nil
                        }
                }
                serversToTry = retryList
@@ -301,7 +308,29 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i
                        isTemp: len(serversToTry) > 0,
                }}
        }
-       return nil, 0, "", err
+       return nil, 0, "", nil, err
+}
+
+// LocalLocator returns a locator equivalent to the one supplied, but
+// with a valid signature from the local cluster. If the given locator
+// already has a local signature, it is returned unchanged.
+func (kc *KeepClient) LocalLocator(locator string) (string, error) {
+       if !strings.Contains(locator, "+R") {
+               // Either it has +A, or it's unsigned and we assume
+               // it's a local locator on a site with signatures
+               // disabled.
+               return locator, nil
+       }
+       sighdr := fmt.Sprintf("local, time=%s", time.Now().UTC().Format(time.RFC3339))
+       _, _, url, hdr, err := kc.getOrHead("HEAD", locator, http.Header{"X-Keep-Signature": []string{sighdr}})
+       if err != nil {
+               return "", err
+       }
+       loc := hdr.Get("X-Keep-Locator")
+       if loc == "" {
+               return "", fmt.Errorf("missing X-Keep-Locator header in HEAD response from %s", url)
+       }
+       return loc, nil
 }
 
 // Get() retrieves a block, given a locator. Returns a reader, the
@@ -312,7 +341,8 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i
 // reader returned by this method will return a BadChecksum error
 // instead of EOF.
 func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
-       return kc.getOrHead("GET", locator)
+       rdr, size, url, _, err := kc.getOrHead("GET", locator, nil)
+       return rdr, size, url, err
 }
 
 // ReadAt() retrieves a portion of block from the cache if it's
@@ -329,7 +359,7 @@ func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
 // Returns the data size (content length) reported by the Keep service
 // and the URI reporting the data size.
 func (kc *KeepClient) Ask(locator string) (int64, string, error) {
-       _, size, url, err := kc.getOrHead("HEAD", locator)
+       _, size, url, _, err := kc.getOrHead("HEAD", locator, nil)
        return size, url, err
 }
 
@@ -516,31 +546,26 @@ func (kc *KeepClient) httpClient() HTTPClient {
                keepAlive = DefaultKeepAlive
        }
 
-       transport, ok := http.DefaultTransport.(*http.Transport)
-       if ok {
-               copy := *transport
-               transport = &copy
-       } else {
-               // Evidently the application has replaced
-               // http.DefaultTransport with a different type, so we
-               // need to build our own from scratch using the Go 1.8
-               // defaults.
-               transport = &http.Transport{
+       c := &http.Client{
+               Timeout: requestTimeout,
+               // It's not safe to copy *http.DefaultTransport
+               // because it has a mutex (which might be locked)
+               // protecting a private map (which might not be nil).
+               // So we build our own, using the Go 1.10 default
+               // values, ignoring any changes the application has
+               // made to http.DefaultTransport.
+               Transport: &http.Transport{
+                       DialContext: (&net.Dialer{
+                               Timeout:   connectTimeout,
+                               KeepAlive: keepAlive,
+                               DualStack: true,
+                       }).DialContext,
                        MaxIdleConns:          100,
                        IdleConnTimeout:       90 * time.Second,
+                       TLSHandshakeTimeout:   tlsTimeout,
                        ExpectContinueTimeout: time.Second,
-               }
-       }
-       transport.DialContext = (&net.Dialer{
-               Timeout:   connectTimeout,
-               KeepAlive: keepAlive,
-               DualStack: true,
-       }).DialContext
-       transport.TLSHandshakeTimeout = tlsTimeout
-       transport.TLSClientConfig = arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure)
-       c := &http.Client{
-               Timeout:   requestTimeout,
-               Transport: transport,
+                       TLSClientConfig:       arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure),
+               },
        }
        defaultClient[kc.Arvados.ApiInsecure][kc.foundNonDiskSvc] = c
        return c
index dc80ad7e1d6378ad09da968db62cf038002d0b9c..176ad65bb11bb93672e56807be3aa49822cebaf0 100644 (file)
@@ -884,6 +884,19 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
                c.Check(n, Equals, int64(len(content)))
                c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
        }
+       {
+               loc, err := kc.LocalLocator(hash)
+               c.Check(err, Equals, nil)
+               c.Assert(len(loc) >= 32, Equals, true)
+               c.Check(loc[:32], Equals, hash[:32])
+       }
+       {
+               content := []byte("the perth county conspiracy")
+               loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
+               c.Check(loc, Equals, "")
+               c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
+               c.Check(err, ErrorMatches, `.*HTTP 400.*`)
+       }
 }
 
 type StubProxyHandler struct {
index f4580f346bbed43a5642e974d54c8e5922c24efd..3281d78e209db3a0e69726d285c59b456ea93035 100644 (file)
@@ -900,6 +900,38 @@ class ArvadosFile(object):
                 return True
         return False
 
+    @synchronized
+    def has_remote_blocks(self):
+        """Returns True if any of the segment's locators has a +R signature"""
+
+        for s in self._segments:
+            if '+R' in s.locator:
+                return True
+        return False
+
+    @synchronized
+    def _copy_remote_blocks(self, remote_blocks={}):
+        """Ask Keep to copy remote blocks and point to their local copies.
+
+        This is called from the parent Collection.
+
+        :remote_blocks:
+            Shared cache of remote to local block mappings. This is used to avoid
+            doing extra work when blocks are shared by more than one file in
+            different subdirectories.
+        """
+
+        for s in self._segments:
+            if '+R' in s.locator:
+                try:
+                    loc = remote_blocks[s.locator]
+                except KeyError:
+                    loc = self.parent._my_keep().refresh_signature(s.locator)
+                    remote_blocks[s.locator] = loc
+                s.locator = loc
+                self.parent.set_committed(False)
+        return remote_blocks
+
     @synchronized
     def segments(self):
         return copy.copy(self._segments)
index e38a6bd475c7b8a4aee7787a537a40656fd93b36..48fdaf03ecd685f3e420437cebb3d46fd1741085 100644 (file)
@@ -520,6 +520,7 @@ class RichCollectionBase(CollectionBase):
     def __init__(self, parent=None):
         self.parent = parent
         self._committed = False
+        self._has_remote_blocks = False
         self._callback = None
         self._items = {}
 
@@ -544,6 +545,23 @@ class RichCollectionBase(CollectionBase):
     def stream_name(self):
         raise NotImplementedError()
 
+    @synchronized
+    def has_remote_blocks(self):
+        """Recursively check for a +R segment locator signature."""
+
+        if self._has_remote_blocks:
+            return True
+        for item in self:
+            if self[item].has_remote_blocks():
+                return True
+        return False
+
+    @synchronized
+    def set_has_remote_blocks(self, val):
+        self._has_remote_blocks = val
+        if self.parent:
+            self.parent.set_has_remote_blocks(val)
+
     @must_be_writable
     @synchronized
     def find_or_create(self, path, create_type):
@@ -832,6 +850,8 @@ class RichCollectionBase(CollectionBase):
 
         self._items[target_name] = item
         self.set_committed(False)
+        if not self._has_remote_blocks and source_obj.has_remote_blocks():
+            self.set_has_remote_blocks(True)
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
@@ -1023,6 +1043,24 @@ class RichCollectionBase(CollectionBase):
             else:
                 return self._manifest_text
 
+    @synchronized
+    def _copy_remote_blocks(self, remote_blocks={}):
+        """Scan through the entire collection and ask Keep to copy remote blocks.
+
+        When accessing a remote collection, blocks will have a remote signature
+        (+R instead of +A). Collect these signatures and request Keep to copy the
+        blocks to the local cluster, returning local (+A) signatures.
+
+        :remote_blocks:
+          Shared cache of remote to local block mappings. This is used to avoid
+          doing extra work when blocks are shared by more than one file in
+          different subdirectories.
+
+        """
+        for item in self:
+            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
+        return remote_blocks
+
     @synchronized
     def diff(self, end_collection, prefix=".", holding_collection=None):
         """Generate list of add/modify/delete actions.
@@ -1257,8 +1295,12 @@ class Collection(RichCollectionBase):
                 self._manifest_locator = manifest_locator_or_text
             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
+                if not self._has_local_collection_uuid():
+                    self._has_remote_blocks = True
             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
                 self._manifest_text = manifest_locator_or_text
+                if '+R' in self._manifest_text:
+                    self._has_remote_blocks = True
             else:
                 raise errors.ArgumentError(
                     "Argument to CollectionReader is not a manifest or a collection UUID")
@@ -1376,6 +1418,10 @@ class Collection(RichCollectionBase):
     def _has_collection_uuid(self):
         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
 
+    def _has_local_collection_uuid(self):
+        return self._has_collection_uuid and \
+            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
+
     def __enter__(self):
         return self
 
@@ -1505,8 +1551,14 @@ class Collection(RichCollectionBase):
             body["trash_at"] = t
 
         if not self.committed():
+            if self._has_remote_blocks:
+                # Copy any remote blocks to the local cluster.
+                self._copy_remote_blocks(remote_blocks={})
+                self._has_remote_blocks = False
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
+            elif not self._has_local_collection_uuid():
+                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
 
             self._my_block_manager().commit_all()
 
@@ -1591,6 +1643,11 @@ class Collection(RichCollectionBase):
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
+        if self._has_remote_blocks:
+            # Copy any remote blocks to the local cluster.
+            self._copy_remote_blocks(remote_blocks={})
+            self._has_remote_blocks = False
+
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
@@ -1627,6 +1684,9 @@ class Collection(RichCollectionBase):
     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
 
+    def _unescape_manifest_path(self, path):
+        return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
+
     @synchronized
     def _import_manifest(self, manifest_text):
         """Import a manifest into a `Collection`.
@@ -1651,7 +1711,7 @@ class Collection(RichCollectionBase):
 
             if state == STREAM_NAME:
                 # starting a new stream
-                stream_name = tok.replace('\\040', ' ')
+                stream_name = self._unescape_manifest_path(tok)
                 blocks = []
                 segments = []
                 streamoffset = 0
@@ -1673,13 +1733,18 @@ class Collection(RichCollectionBase):
                 if file_segment:
                     pos = int(file_segment.group(1))
                     size = int(file_segment.group(2))
-                    name = file_segment.group(3).replace('\\040', ' ')
-                    filepath = os.path.join(stream_name, name)
-                    afile = self.find_or_create(filepath, FILE)
-                    if isinstance(afile, ArvadosFile):
-                        afile.add_segment(blocks, pos, size)
+                    name = self._unescape_manifest_path(file_segment.group(3))
+                    if name.split('/')[-1] == '.':
+                        # placeholder for persisting an empty directory, not a real file
+                        if len(name) > 2:
+                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
                     else:
-                        raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
+                        filepath = os.path.join(stream_name, name)
+                        afile = self.find_or_create(filepath, FILE)
+                        if isinstance(afile, ArvadosFile):
+                            afile.add_segment(blocks, pos, size)
+                        else:
+                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
index 71e101cf4c5073d40e78f73c0bf46a9ff231f937..1b6376e9be1035dac69bb34da974c2c486477627 100644 (file)
@@ -377,7 +377,7 @@ class KeepClient(object):
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
-                    self._setcurltimeouts(curl, timeout)
+                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
                         curl.perform()
@@ -421,6 +421,10 @@ class KeepClient(object):
                 _logger.info("HEAD %s: %s bytes",
                          self._result['status_code'],
                          self._result.get('content-length'))
+                if self._result['headers'].get('x-keep-locator'):
+                    # This is a response to a remote block copy request, return
+                    # the local copy block locator.
+                    return self._result['headers'].get('x-keep-locator')
                 return True
 
             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
@@ -512,7 +516,7 @@ class KeepClient(object):
                 self.upload_counter.add(len(body))
             return True
 
-        def _setcurltimeouts(self, curl, timeouts):
+        def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
@@ -525,8 +529,9 @@ class KeepClient(object):
                 conn_t, xfer_t = (timeouts, timeouts)
                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
-            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+            if not ignore_bandwidth:
+                curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+                curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             if isinstance(header_line, bytes):
@@ -975,6 +980,11 @@ class KeepClient(object):
         else:
             return None
 
+    def refresh_signature(self, loc):
+        """Ask Keep to get the remote block and return its local signature"""
+        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
+        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
+
     @retry.retry_method
     def head(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="HEAD", **kwargs)
@@ -983,7 +993,7 @@ class KeepClient(object):
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1024,11 +1034,11 @@ class KeepClient(object):
 
             self.misses_counter.add(1)
 
-            headers = {
-                'X-Request-Id': (request_id or
-                                 (hasattr(self, 'api_client') and self.api_client.request_id) or
-                                 arvados.util.new_request_id()),
-            }
+            if headers is None:
+                headers = {}
+            headers['X-Request-Id'] = (request_id or
+                                        (hasattr(self, 'api_client') and self.api_client.request_id) or
+                                        arvados.util.new_request_id())
 
             # If the locator has hints specifying a prefix (indicating a
             # remote keepproxy) or the UUID of a local gateway service,
@@ -1085,10 +1095,7 @@ class KeepClient(object):
 
             # Always cache the result, then return it if we succeeded.
             if loop.success():
-                if method == "HEAD":
-                    return True
-                else:
-                    return blob
+                return blob
         finally:
             if slot is not None:
                 slot.set(blob)
index 722cc56046c99777f864833be641e81914039af5..ac18c44c6844c2f54fbc54d91acf094778834b3c 100644 (file)
@@ -1162,11 +1162,14 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
 class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
     KEEP_SERVER = {}
+    local_locator_re = r"[0-9a-f]{32}\+\d+\+A[a-f0-9]{40}@[a-f0-9]{8}"
+    remote_locator_re = r"[0-9a-f]{32}\+\d+\+R[a-z]{5}-[a-f0-9]{40}@[a-f0-9]{8}"
 
     def setUp(self):
         self.keep_put = getattr(arvados.keep.KeepClient, 'put')
 
-    def test_repacked_block_submission_get_permission_token(self):
+    @mock.patch('arvados.keep.KeepClient.put', autospec=True)
+    def test_repacked_block_submission_get_permission_token(self, mocked_put):
         '''
         Make sure that those blocks that are committed after repacking small ones,
         get their permission tokens assigned on the collection manifest.
@@ -1176,19 +1179,65 @@ class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServ
             time.sleep(1)
             return self.keep_put(*args, **kwargs)
 
-        re_locator = "[0-9a-f]{32}\+\d+\+A[a-f0-9]{40}@[a-f0-9]{8}"
-
-        with mock.patch('arvados.keep.KeepClient.put', autospec=True) as mocked_put:
-            mocked_put.side_effect = wrapped_keep_put
-            c = Collection()
-            # Write 70 files ~1MiB each so we force to produce 1 big block by repacking
-            # small ones before finishing the upload.
-            for i in range(70):
-                f = c.open("file_{}.txt".format(i), 'wb')
-                f.write(random.choice('abcdefghijklmnopqrstuvwxyz') * (2**20+i))
-                f.close(flush=False)
-            # We should get 2 blocks with their tokens
-            self.assertEqual(len(re.findall(re_locator, c.manifest_text())), 2)
+        mocked_put.side_effect = wrapped_keep_put
+        c = Collection()
+        # Write 70 files ~1MiB each so we force to produce 1 big block by repacking
+        # small ones before finishing the upload.
+        for i in range(70):
+            f = c.open("file_{}.txt".format(i), 'wb')
+            f.write(random.choice('abcdefghijklmnopqrstuvwxyz') * (2**20+i))
+            f.close(flush=False)
+        # We should get 2 blocks with their tokens
+        self.assertEqual(len(re.findall(self.local_locator_re, c.manifest_text())), 2)
+
+    @mock.patch('arvados.keep.KeepClient.refresh_signature')
+    def test_copy_remote_blocks_on_save_new(self, rs_mock):
+        remote_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+Remote-" + "a" * 40 + "@abcdef01"
+        local_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+A" + "b" * 40 + "@abcdef01"
+        rs_mock.return_value = local_block_loc
+        c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, c.manifest_text())), 1)
+        self.assertEqual(
+            len(re.findall(self.local_locator_re, c.manifest_text())), 0)
+        c.save_new()
+        rs_mock.assert_called()
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, c.manifest_text())), 0)
+        self.assertEqual(
+            len(re.findall(self.local_locator_re, c.manifest_text())), 1)
+
+    @mock.patch('arvados.keep.KeepClient.refresh_signature')
+    def test_copy_remote_blocks_on_save(self, rs_mock):
+        remote_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+Remote-" + "a" * 40 + "@abcdef01"
+        local_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+A" + "b" * 40 + "@abcdef01"
+        rs_mock.return_value = local_block_loc
+        # Remote collection
+        remote_c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, remote_c.manifest_text())), 1)
+        # Local collection
+        local_c = Collection()
+        with local_c.open('barfile.txt', 'wb') as f:
+            f.write('bar')
+        local_c.save_new()
+        self.assertEqual(
+            len(re.findall(self.local_locator_re, local_c.manifest_text())), 1)
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
+        # Copy remote file to local collection
+        local_c.copy('./foofile.txt', './copied/foofile.txt', remote_c)
+        self.assertEqual(
+            len(re.findall(self.local_locator_re, local_c.manifest_text())), 1)
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, local_c.manifest_text())), 1)
+        # Save local collection: remote block should be copied
+        local_c.save()
+        rs_mock.assert_called()
+        self.assertEqual(
+            len(re.findall(self.local_locator_re, local_c.manifest_text())), 2)
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
 
 
 class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
index a7b79933bbc2999381fea887ac3a70e77f346b3c..d6b3a2a12dc6f4d4d8bb997c25f43cc21e8cda62 100644 (file)
@@ -342,6 +342,25 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
                 None)
 
+    def test_refresh_signature(self):
+        blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
+        blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
+        local_loc = blk_digest+'+A'+blk_sig
+        remote_loc = blk_digest+'+R'+blk_sig
+        api_client = self.mock_keep_services(count=1)
+        headers = {'X-Keep-Locator':local_loc}
+        with tutil.mock_keep_responses('', 200, **headers):
+            # Check that the translated locator gets returned
+            keep_client = arvados.KeepClient(api_client=api_client)
+            self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
+            # Check that refresh_signature() uses the correct method and headers
+            keep_client._get_or_head = mock.MagicMock()
+            keep_client.refresh_signature(remote_loc)
+            args, kwargs = keep_client._get_or_head.call_args_list[0]
+            self.assertIn(remote_loc, args)
+            self.assertEqual("HEAD", kwargs['method'])
+            self.assertIn('X-Keep-Signature', kwargs['headers'])
+
     # test_*_timeout verify that KeepClient instructs pycurl to use
     # the appropriate connection and read timeouts. They don't care
     # whether pycurl actually exhibits the expected timeout behavior
@@ -393,10 +412,10 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
             self.assertEqual(
                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
-                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+                None)
             self.assertEqual(
                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
-                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
+                None)
 
     def test_proxy_get_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -427,10 +446,10 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
             self.assertEqual(
                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
-                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+                None)
             self.assertEqual(
                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
-                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
+                None)
 
     def test_proxy_put_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -541,6 +560,43 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual(1, req_mock.call_count)
 
 
+@tutil.skip_sleep
+class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+    def setUp(self):
+        self.api_client = self.mock_keep_services(count=2)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.data = b'xyzzy'
+        self.locator = '1271ed5ef305aadabc605b1609e24c52'
+
+    @mock.patch('arvados.KeepClient.KeepService.get')
+    def test_get_request_cache(self, get_mock):
+        with tutil.mock_keep_responses(self.data, 200, 200):
+            self.keep_client.get(self.locator)
+            self.keep_client.get(self.locator)
+        # Request already cached, don't require more than one request
+        get_mock.assert_called_once()
+
+    @mock.patch('arvados.KeepClient.KeepService.get')
+    def test_head_request_cache(self, get_mock):
+        with tutil.mock_keep_responses(self.data, 200, 200):
+            self.keep_client.head(self.locator)
+            self.keep_client.head(self.locator)
+        # Don't cache HEAD requests so that they're not confused with GET reqs
+        self.assertEqual(2, get_mock.call_count)
+
+    @mock.patch('arvados.KeepClient.KeepService.get')
+    def test_head_and_then_get_return_different_responses(self, get_mock):
+        head_resp = None
+        get_resp = None
+        get_mock.side_effect = ['first response', 'second response']
+        with tutil.mock_keep_responses(self.data, 200, 200):
+            head_resp = self.keep_client.head(self.locator)
+            get_resp = self.keep_client.get(self.locator)
+        self.assertEqual('first response', head_resp)
+        # First reponse was not cached because it was from a HEAD request.
+        self.assertNotEqual(head_resp, get_resp)
+
+
 @tutil.skip_sleep
 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
     def setUp(self):
@@ -830,7 +886,7 @@ class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
         loc = kc.put(self.DATA, copies=1, num_retries=0)
         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
         with self.assertTakesGreater(self.TIMEOUT_TIME):
-            with self.assertRaises(arvados.errors.KeepReadError) as e:
+            with self.assertRaises(arvados.errors.KeepReadError):
                 kc.get(loc, num_retries=0)
         with self.assertTakesGreater(self.TIMEOUT_TIME):
             with self.assertRaises(arvados.errors.KeepWriteError):
@@ -842,14 +898,13 @@ class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
         self.server.setdelays(response=self.TIMEOUT_TIME)
         with self.assertTakesGreater(self.TIMEOUT_TIME):
-            with self.assertRaises(arvados.errors.KeepReadError) as e:
+            with self.assertRaises(arvados.errors.KeepReadError):
                 kc.get(loc, num_retries=0)
         with self.assertTakesGreater(self.TIMEOUT_TIME):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 kc.put(self.DATA, copies=1, num_retries=0)
         with self.assertTakesGreater(self.TIMEOUT_TIME):
-            with self.assertRaises(arvados.errors.KeepReadError) as e:
-                kc.head(loc, num_retries=0)
+            kc.head(loc, num_retries=0)
 
     def test_low_bandwidth_with_server_mid_delay_failure(self):
         kc = self.keepClient()
index 3cf79b2d8f32d73064e27c2d831f5f6dde6baafd..e9267dcb3bf84e630ac6b01168e77067793a4ce9 100644 (file)
@@ -149,7 +149,7 @@ GEM
       railties (>= 4)
       request_store (~> 1.0)
     logstash-event (1.2.02)
-    loofah (2.2.2)
+    loofah (2.2.3)
       crass (~> 1.0.2)
       nokogiri (>= 1.5.9)
     mail (2.7.0)
@@ -171,7 +171,7 @@ GEM
     net-ssh (4.2.0)
     net-ssh-gateway (2.0.0)
       net-ssh (>= 4.0.0)
-    nokogiri (1.8.2)
+    nokogiri (1.8.5)
       mini_portile2 (~> 2.3.0)
     oauth2 (1.4.0)
       faraday (>= 0.8, < 0.13)
index 24a2214d974d91d61f5a884226056860d73e2697..3cfe5b54fda2a2a90afa168938ab5ff9e4760064 100644 (file)
@@ -597,6 +597,12 @@ class ApplicationController < ActionController::Base
       limit: { type: 'integer', required: false, default: DEFAULT_LIMIT },
       offset: { type: 'integer', required: false, default: 0 },
       count: { type: 'string', required: false, default: 'exact' },
+      cluster_id: {
+        type: 'string',
+        description: "List objects on a remote federated cluster instead of the current one.",
+        location: "query",
+        required: false,
+      },
     }
   end
 
index 35a19b136cc3298892a550d55c800bdcbda48657..5fb7036bf2596acb8425c2e6695b8a63ad259da8 100644 (file)
@@ -9,7 +9,7 @@ class UserNotifier < ActionMailer::Base
 
   def account_is_setup(user)
     @user = user
-    mail(to: user.email, subject: 'Welcome to Curoverse - shell account enabled')
+    mail(to: user.email, subject: 'Welcome to Arvados - shell account enabled')
   end
 
 end
index 718ffc0d0a51416440ff75ec98c442cfe64423b9..487043ee3549d8afe915f9abeeaeab2c8f252707 100644 (file)
@@ -496,7 +496,14 @@ class Collection < ArvadosModel
     if loc = Keep::Locator.parse(search_term)
       loc.strip_hints!
       coll_match = readable_by(*readers).where(portable_data_hash: loc.to_s).limit(1)
-      return get_compatible_images(readers, pattern, coll_match)
+      if coll_match.any? or Rails.configuration.remote_hosts.length == 0
+        return get_compatible_images(readers, pattern, coll_match)
+      else
+        # Allow bare pdh that doesn't exist in the local database so
+        # that federated container requests which refer to remotely
+        # stored containers will validate.
+        return [Collection.new(portable_data_hash: loc.to_s)]
+      end
     end
 
     if search_tag.nil? and (n = search_term.index(":"))
index 0d8453174e205e85ab3f79e01a32cc530478a4a1..ac67040edf799465c1dda671e0a4d0eb80cf9483 100644 (file)
@@ -223,7 +223,11 @@ class Container < ArvadosModel
       if mount['kind'] != 'collection'
         next
       end
-      if (uuid = mount.delete 'uuid')
+
+      uuid = mount.delete 'uuid'
+
+      if mount['portable_data_hash'].nil? and !uuid.nil?
+        # PDH not supplied, try by UUID
         c = Collection.
           readable_by(current_user).
           where(uuid: uuid).
@@ -232,13 +236,7 @@ class Container < ArvadosModel
         if !c
           raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found"
         end
-        if mount['portable_data_hash'].nil?
-          # PDH not supplied by client
-          mount['portable_data_hash'] = c.portable_data_hash
-        elsif mount['portable_data_hash'] != c.portable_data_hash
-          # UUID and PDH supplied by client, but they don't agree
-          raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request"
-        end
+        mount['portable_data_hash'] = c.portable_data_hash
       end
     end
     return c_mounts
@@ -493,10 +491,14 @@ class Container < ArvadosModel
       return false
     end
 
-    if current_api_client_authorization.andand.uuid.andand == self.auth_uuid
-      # The contained process itself can update progress indicators,
-      # but can't change priority etc.
-      permitted = permitted & (progress_attrs + final_attrs + [:state] - [:log])
+    if self.state == Running &&
+       !current_api_client_authorization.nil? &&
+       (current_api_client_authorization.uuid == self.auth_uuid ||
+        current_api_client_authorization.token == self.runtime_token)
+      # The contained process itself can write final attrs but can't
+      # change priority or log.
+      permitted.push *final_attrs
+      permitted = permitted - [:log, :priority]
     elsif self.locked_by_uuid && self.locked_by_uuid != current_api_client_authorization.andand.uuid
       # When locked, progress fields cannot be updated by the wrong
       # dispatcher, even though it has admin privileges.
index b01597c05bf0280ea6cc6fa052ba98ff70526994..728a2a1fa2abc9c1261c64a7fddf077dc6747446 100644 (file)
@@ -638,7 +638,7 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
 
     assert_equal Rails.configuration.user_notifier_email_from, setup_email.from[0]
     assert_equal 'foo@example.com', setup_email.to[0]
-    assert_equal 'Welcome to Curoverse - shell account enabled', setup_email.subject
+    assert_equal 'Welcome to Arvados - shell account enabled', setup_email.subject
     assert (setup_email.body.to_s.include? 'Your Arvados shell account has been set up'),
         'Expected Your Arvados shell account has been set up in email body'
     assert (setup_email.body.to_s.include? "#{Rails.configuration.workbench_address}users/#{created['uuid']}/virtual_machines"), 'Expected virtual machines url in email body'
index 0e61db7bcd9d5cc0cb185c4766a2e597c6d6ed4a..44737524e5f583cb76bb62a6aa0ff8af5ca91319 100644 (file)
@@ -63,8 +63,8 @@ class RemoteUsersTest < ActionDispatch::IntegrationTest
     ready.pop
     @remote_server = srv
     @remote_host = "127.0.0.1:#{srv.config[:Port]}"
-    Rails.configuration.remote_hosts['zbbbb'] = @remote_host
-    Rails.configuration.remote_hosts['zbork'] = @remote_host
+    Rails.configuration.remote_hosts = Rails.configuration.remote_hosts.merge({'zbbbb' => @remote_host,
+                                                                               'zbork' => @remote_host})
     Arvados::V1::SchemaController.any_instance.stubs(:root_url).returns "https://#{@remote_host}"
     @stub_status = 200
     @stub_content = {
index 8ff216e28caf8a598c5b6fbbf46a9d342e4a7c35..a7700573d4d83eade9badb4d9c4b0650ddc0947e 100644 (file)
@@ -441,6 +441,27 @@ class ContainerRequestTest < ActiveSupport::TestCase
         "path" => "/foo",
       }
     end],
+   [{"/out" => {
+      "kind" => "collection",
+      "portable_data_hash" => "1f4b0bc7583c2a7f9102c395f4ffc5e3+45",
+      "path" => "/foo"}},
+    lambda do |resolved|
+      resolved["/out"] == {
+        "portable_data_hash" => "1f4b0bc7583c2a7f9102c395f4ffc5e3+45",
+        "kind" => "collection",
+        "path" => "/foo",
+      }
+    end],
+    # Empty collection
+    [{"/out" => {
+      "kind" => "collection",
+      "path" => "/foo"}},
+    lambda do |resolved|
+      resolved["/out"] == {
+        "kind" => "collection",
+        "path" => "/foo",
+      }
+    end],
   ].each do |mounts, okfunc|
     test "resolve mounts #{mounts.inspect} to values" do
       set_user_from_auth :active
@@ -474,9 +495,8 @@ class ContainerRequestTest < ActiveSupport::TestCase
         "path" => "/foo",
       },
     }
-    assert_raises(ArgumentError) do
-      Container.resolve_mounts(m)
-    end
+    resolved_mounts = Container.resolve_mounts(m)
+    assert_equal m['portable_data_hash'], resolved_mounts['portable_data_hash']
   end
 
   ['arvados/apitestfixture:latest',
@@ -512,6 +532,12 @@ class ContainerRequestTest < ActiveSupport::TestCase
     end
   end
 
+  test "allow unrecognized container when there are remote_hosts" do
+    set_user_from_auth :active
+    Rails.configuration.remote_hosts = {"foooo" => "bar.com"}
+    Container.resolve_container_image('acbd18db4cc2f85cedef654fccc4a4d8+3')
+  end
+
   test "migrated docker image" do
     Rails.configuration.docker_image_formats = ['v2']
     add_docker19_migration_link
index 491022ad8d5a9cd6e47e1cf7727a5cba92d54ce4..90b4f13bf597b5b9ea306dec04b698e75fb98ae3 100644 (file)
@@ -777,25 +777,41 @@ class ContainerTest < ActiveSupport::TestCase
     assert_equal [logpdh_time2], Collection.where(uuid: [cr1log_uuid, cr2log_uuid]).to_a.collect(&:portable_data_hash).uniq
   end
 
-  test "auth_uuid can set output, progress, runtime_status, state on running container -- but not log" do
-    set_user_from_auth :active
-    c, _ = minimal_new
-    set_user_from_auth :dispatch1
-    c.lock
-    c.update_attributes! state: Container::Running
-
-    auth = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
-    Thread.current[:api_client_authorization] = auth
-    Thread.current[:api_client] = auth.api_client
-    Thread.current[:token] = auth.token
-    Thread.current[:user] = auth.user
+  ["auth_uuid", "runtime_token"].each do |tok|
+    test "#{tok} can set output, progress, runtime_status, state on running container -- but not log" do
+      if tok == "runtime_token"
+        set_user_from_auth :spectator
+        c, _ = minimal_new(container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
+                           runtime_token: api_client_authorizations(:active).token)
+      else
+        set_user_from_auth :active
+        c, _ = minimal_new
+      end
+      set_user_from_auth :dispatch1
+      c.lock
+      c.update_attributes! state: Container::Running
+
+      if tok == "runtime_token"
+        auth = ApiClientAuthorization.validate(token: c.runtime_token)
+        Thread.current[:api_client_authorization] = auth
+        Thread.current[:api_client] = auth.api_client
+        Thread.current[:token] = auth.token
+        Thread.current[:user] = auth.user
+      else
+        auth = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
+        Thread.current[:api_client_authorization] = auth
+        Thread.current[:api_client] = auth.api_client
+        Thread.current[:token] = auth.token
+        Thread.current[:user] = auth.user
+      end
 
-    assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
-    assert c.update_attributes(runtime_status: {'warning' => 'something happened'})
-    assert c.update_attributes(progress: 0.5)
-    refute c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
-    c.reload
-    assert c.update_attributes(state: Container::Complete, exit_code: 0)
+      assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
+      assert c.update_attributes(runtime_status: {'warning' => 'something happened'})
+      assert c.update_attributes(progress: 0.5)
+      refute c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
+      c.reload
+      assert c.update_attributes(state: Container::Complete, exit_code: 0)
+    end
   end
 
   test "not allowed to set output that is not readable by current user" do
index fc6a97cf7480c645206c867e3449822bfcfa41a5..41e2adb9c3d35a2a6d52f9244b666913eff3e1d5 100644 (file)
@@ -127,6 +127,7 @@ class JobTest < ActiveSupport::TestCase
     'locator' => BAD_COLLECTION,
   }.each_pair do |spec_type, image_spec|
     test "Job validation fails with nonexistent Docker image #{spec_type}" do
+      Rails.configuration.remote_hosts = {}
       job = Job.new job_attrs(runtime_constraints:
                               {'docker_image' => image_spec})
       assert(job.invalid?, "nonexistent Docker image #{spec_type} was valid")
index e2c4da70200ff3db00a206f7692e43e6626bc404..008259c0b65041146ed4e59e29eff34876bc6204 100644 (file)
@@ -16,7 +16,7 @@ class UserNotifierTest < ActionMailer::TestCase
     # Test the body of the sent email contains what we expect it to
     assert_equal Rails.configuration.user_notifier_email_from, email.from.first
     assert_equal user.email, email.to.first
-    assert_equal 'Welcome to Curoverse - shell account enabled', email.subject
+    assert_equal 'Welcome to Arvados - shell account enabled', email.subject
     assert (email.body.to_s.include? 'Your Arvados shell account has been set up'),
         'Expected Your Arvados shell account has been set up in email body'
     assert (email.body.to_s.include? Rails.configuration.workbench_address),
index 27fb8367f50e05764d45c77dc362497f7f702847..1c6c58009fac71bd5fa5015a4922821937e61d5a 100644 (file)
@@ -32,7 +32,6 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "github.com/shirou/gopsutil/process"
        "golang.org/x/net/context"
 
        dockertypes "github.com/docker/docker/api/types"
@@ -61,6 +60,7 @@ type IKeepClient interface {
        PutB(buf []byte) (string, int, error)
        ReadAt(locator string, p []byte, off int) (int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
+       LocalLocator(locator string) (string, error)
        ClearBlockCache()
 }
 
@@ -79,6 +79,7 @@ type ThinDockerClient interface {
        ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
        ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
        ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
+       ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error)
        ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
        ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
        ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
@@ -91,10 +92,30 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-       Docker          ThinDockerClient
-       client          *arvados.Client
-       ArvClient       IArvadosClient
-       Kc              IKeepClient
+       Docker ThinDockerClient
+
+       // Dispatcher client is initialized with the Dispatcher token.
+       // This is a priviledged token used to manage container status
+       // and logs.
+       //
+       // We have both dispatcherClient and DispatcherArvClient
+       // because there are two different incompatible Arvados Go
+       // SDKs and we have to use both (hopefully this gets fixed in
+       // #14467)
+       dispatcherClient     *arvados.Client
+       DispatcherArvClient  IArvadosClient
+       DispatcherKeepClient IKeepClient
+
+       // Container client is initialized with the Container token
+       // This token controls the permissions of the container, and
+       // must be used for operations such as reading collections.
+       //
+       // Same comment as above applies to
+       // containerClient/ContainerArvClient.
+       containerClient     *arvados.Client
+       ContainerArvClient  IArvadosClient
+       ContainerKeepClient IKeepClient
+
        Container       arvados.Container
        ContainerConfig dockercontainer.Config
        HostConfig      dockercontainer.HostConfig
@@ -121,12 +142,10 @@ type ContainerRunner struct {
        SigChan         chan os.Signal
        ArvMountExit    chan error
        SecretMounts    map[string]arvados.Mount
-       MkArvClient     func(token string) (IArvadosClient, error)
+       MkArvClient     func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
        finalState      string
        parentTemp      string
 
-       ListProcesses func() ([]PsProcess, error)
-
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
        hoststatLogger   io.WriteCloser
@@ -149,11 +168,13 @@ type ContainerRunner struct {
 
        cStateLock sync.Mutex
        cCancelled bool // StopContainer() invoked
+       cRemoved   bool // docker confirmed the container no longer exists
+
+       enableNetwork string // one of "default" or "always"
+       networkMode   string // passed through to HostConfig.NetworkMode
+       arvMountLog   *ThrottledLogger
 
-       enableNetwork   string // one of "default" or "always"
-       networkMode     string // passed through to HostConfig.NetworkMode
-       arvMountLog     *ThrottledLogger
-       checkContainerd time.Duration
+       containerWatchdogInterval time.Duration
 }
 
 // setupSignals sets up signal handling to gracefully terminate the underlying
@@ -187,6 +208,9 @@ func (runner *ContainerRunner) stop(sig os.Signal) {
        if err != nil {
                runner.CrunchLog.Printf("error removing container: %s", err)
        }
+       if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) {
+               runner.cRemoved = true
+       }
 }
 
 var errorBlacklist = []string{
@@ -231,7 +255,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
        runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
 
        var collection arvados.Collection
-       err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
+       err = runner.ContainerArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
        if err != nil {
                return fmt.Errorf("While getting container image collection: %v", err)
        }
@@ -252,7 +276,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
                runner.CrunchLog.Print("Loading Docker image from keep")
 
                var readCloser io.ReadCloser
-               readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
+               readCloser, err = runner.ContainerKeepClient.ManifestFileReader(manifest, img)
                if err != nil {
                        return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
                }
@@ -274,7 +298,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
        runner.ContainerConfig.Image = imageID
 
-       runner.Kc.ClearBlockCache()
+       runner.ContainerKeepClient.ClearBlockCache()
 
        return nil
 }
@@ -575,7 +599,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if err != nil {
                                return fmt.Errorf("creating temp dir: %v", err)
                        }
-                       err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
+                       err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
                        if err != nil {
                                return err
                        }
@@ -842,13 +866,13 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
                return false, err
        }
        w := &ArvLogWriter{
-               ArvClient:     runner.ArvClient,
+               ArvClient:     runner.DispatcherArvClient,
                UUID:          runner.Container.UUID,
                loggingStream: label,
                writeCloser:   writer,
        }
 
-       reader, err := runner.ArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
+       reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
        if err != nil {
                return false, fmt.Errorf("error getting %s record: %v", label, err)
        }
@@ -898,12 +922,14 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                        if collId == "" {
                                collId = stdinMnt.PortableDataHash
                        }
-                       err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+                       err = runner.ContainerArvClient.Get("collections", collId, nil, &stdinColl)
                        if err != nil {
-                               return fmt.Errorf("While getting stding collection: %v", err)
+                               return fmt.Errorf("While getting stdin collection: %v", err)
                        }
 
-                       stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+                       stdinRdr, err = runner.ContainerKeepClient.ManifestFileReader(
+                               manifest.Manifest{Text: stdinColl.ManifestText},
+                               stdinMnt.Path)
                        if os.IsNotExist(err) {
                                return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
                        } else if err != nil {
@@ -1091,27 +1117,6 @@ func (runner *ContainerRunner) StartContainer() error {
        return nil
 }
 
-// checkContainerd checks if "containerd" is present in the process list.
-func (runner *ContainerRunner) CheckContainerd() error {
-       if runner.checkContainerd == 0 {
-               return nil
-       }
-       p, _ := runner.ListProcesses()
-       for _, i := range p {
-               e, _ := i.CmdlineSlice()
-               if len(e) > 0 {
-                       if strings.Index(e[0], "containerd") > -1 {
-                               return nil
-                       }
-               }
-       }
-
-       // Not found
-       runner.runBrokenNodeHook()
-       runner.stop(nil)
-       return fmt.Errorf("'containerd' not found in process list.")
-}
-
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
 func (runner *ContainerRunner) WaitFinish() error {
@@ -1124,26 +1129,31 @@ func (runner *ContainerRunner) WaitFinish() error {
                runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
        }
 
-       containerdGone := make(chan error)
-       defer close(containerdGone)
-       if runner.checkContainerd > 0 {
-               go func() {
-                       ticker := time.NewTicker(time.Duration(runner.checkContainerd))
-                       defer ticker.Stop()
-                       for {
-                               select {
-                               case <-ticker.C:
-                                       if ck := runner.CheckContainerd(); ck != nil {
-                                               containerdGone <- ck
-                                               return
-                                       }
-                               case <-containerdGone:
-                                       // Channel closed, quit goroutine
-                                       return
-                               }
+       containerGone := make(chan struct{})
+       go func() {
+               defer close(containerGone)
+               if runner.containerWatchdogInterval < 1 {
+                       runner.containerWatchdogInterval = time.Minute
+               }
+               for range time.NewTicker(runner.containerWatchdogInterval).C {
+                       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
+                       ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
+                       cancel()
+                       runner.cStateLock.Lock()
+                       done := runner.cRemoved || runner.ExitCode != nil
+                       runner.cStateLock.Unlock()
+                       if done {
+                               return
+                       } else if err != nil {
+                               runner.CrunchLog.Printf("Error inspecting container: %s", err)
+                               runner.checkBrokenNode(err)
+                               return
+                       } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
+                               runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State)
+                               return
                        }
-               }()
-       }
+               }
+       }()
 
        for {
                select {
@@ -1171,8 +1181,8 @@ func (runner *ContainerRunner) WaitFinish() error {
                        runner.stop(nil)
                        runTimeExceeded = nil
 
-               case err := <-containerdGone:
-                       return err
+               case <-containerGone:
+                       return errors.New("docker client never returned status")
                }
        }
 }
@@ -1213,7 +1223,7 @@ func (runner *ContainerRunner) updateLogs() {
                }
 
                var updated arvados.Container
-               err = runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
                        "container": arvadosclient.Dict{"log": saved.PortableDataHash},
                }, &updated)
                if err != nil {
@@ -1231,7 +1241,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
                // Output may have been set directly by the container, so
                // refresh the container record to check.
-               err := runner.ArvClient.Get("containers", runner.Container.UUID,
+               err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
                        nil, &runner.Container)
                if err != nil {
                        return err
@@ -1244,9 +1254,9 @@ func (runner *ContainerRunner) CaptureOutput() error {
        }
 
        txt, err := (&copier{
-               client:        runner.client,
-               arvClient:     runner.ArvClient,
-               keepClient:    runner.Kc,
+               client:        runner.containerClient,
+               arvClient:     runner.ContainerArvClient,
+               keepClient:    runner.ContainerKeepClient,
                hostOutputDir: runner.HostOutputDir,
                ctrOutputDir:  runner.Container.OutputPath,
                binds:         runner.Binds,
@@ -1257,8 +1267,19 @@ func (runner *ContainerRunner) CaptureOutput() error {
        if err != nil {
                return err
        }
+       if n := len(regexp.MustCompile(` [0-9a-f]+\+\S*\+R`).FindAllStringIndex(txt, -1)); n > 0 {
+               runner.CrunchLog.Printf("Copying %d data blocks from remote input collections...", n)
+               fs, err := (&arvados.Collection{ManifestText: txt}).FileSystem(runner.containerClient, runner.ContainerKeepClient)
+               if err != nil {
+                       return err
+               }
+               txt, err = fs.MarshalManifest(".")
+               if err != nil {
+                       return err
+               }
+       }
        var resp arvados.Collection
-       err = runner.ArvClient.Create("collections", arvadosclient.Dict{
+       err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
                "ensure_unique_name": true,
                "collection": arvadosclient.Dict{
                        "is_trashed":    true,
@@ -1346,7 +1367,7 @@ func (runner *ContainerRunner) CommitLogs() error {
                // other further errors (such as failing to write the log to Keep!)
                // while shutting down
                runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
-                       ArvClient:     runner.ArvClient,
+                       ArvClient:     runner.DispatcherArvClient,
                        UUID:          runner.Container.UUID,
                        loggingStream: "crunch-run",
                        writeCloser:   nil,
@@ -1398,9 +1419,9 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
        reqBody := arvadosclient.Dict{"collection": updates}
        if runner.logUUID == "" {
                reqBody["ensure_unique_name"] = true
-               err = runner.ArvClient.Create("collections", reqBody, &response)
+               err = runner.DispatcherArvClient.Create("collections", reqBody, &response)
        } else {
-               err = runner.ArvClient.Update("collections", runner.logUUID, reqBody, &response)
+               err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
        }
        if err != nil {
                return
@@ -1416,7 +1437,7 @@ func (runner *ContainerRunner) UpdateContainerRunning() error {
        if runner.cCancelled {
                return ErrCancelled
        }
-       return runner.ArvClient.Update("containers", runner.Container.UUID,
+       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
                arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
 }
 
@@ -1428,7 +1449,7 @@ func (runner *ContainerRunner) ContainerToken() (string, error) {
        }
 
        var auth arvados.APIClientAuthorization
-       err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
+       err := runner.DispatcherArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
        if err != nil {
                return "", err
        }
@@ -1452,7 +1473,7 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
                        update["output"] = *runner.OutputPDH
                }
        }
-       return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
+       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
 }
 
 // IsCancelled returns the value of Cancelled, with goroutine safety.
@@ -1469,7 +1490,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err
                return nil, err
        }
        return &ArvLogWriter{
-               ArvClient:     runner.ArvClient,
+               ArvClient:     runner.DispatcherArvClient,
                UUID:          runner.Container.UUID,
                loggingStream: name,
                writeCloser:   writer,
@@ -1547,12 +1568,6 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       // Sanity check that containerd is running.
-       err = runner.CheckContainerd()
-       if err != nil {
-               return
-       }
-
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
@@ -1621,7 +1636,7 @@ func (runner *ContainerRunner) Run() (err error) {
 // Fetch the current container record (uuid = runner.Container.UUID)
 // into runner.Container.
 func (runner *ContainerRunner) fetchContainerRecord() error {
-       reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+       reader, err := runner.DispatcherArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
        if err != nil {
                return fmt.Errorf("error fetching container record: %v", err)
        }
@@ -1643,12 +1658,13 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
                return fmt.Errorf("error getting container token: %v", err)
        }
 
-       containerClient, err := runner.MkArvClient(containerToken)
+       runner.ContainerArvClient, runner.ContainerKeepClient,
+               runner.containerClient, err = runner.MkArvClient(containerToken)
        if err != nil {
                return fmt.Errorf("error creating container API client: %v", err)
        }
 
-       err = containerClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
+       err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
        if err != nil {
                if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
                        return fmt.Errorf("error fetching secret_mounts: %v", err)
@@ -1662,37 +1678,37 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
 }
 
 // NewContainerRunner creates a new container runner.
-func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClient, docker ThinDockerClient, containerUUID string) (*ContainerRunner, error) {
+func NewContainerRunner(dispatcherClient *arvados.Client,
+       dispatcherArvClient IArvadosClient,
+       dispatcherKeepClient IKeepClient,
+       docker ThinDockerClient,
+       containerUUID string) (*ContainerRunner, error) {
+
        cr := &ContainerRunner{
-               client:    client,
-               ArvClient: api,
-               Kc:        kc,
-               Docker:    docker,
+               dispatcherClient:     dispatcherClient,
+               DispatcherArvClient:  dispatcherArvClient,
+               DispatcherKeepClient: dispatcherKeepClient,
+               Docker:               docker,
        }
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
-       cr.ListProcesses = func() ([]PsProcess, error) {
-               pr, err := process.Processes()
-               if err != nil {
-                       return nil, err
-               }
-               ps := make([]PsProcess, len(pr))
-               for i, j := range pr {
-                       ps[i] = j
-               }
-               return ps, nil
-       }
-       cr.MkArvClient = func(token string) (IArvadosClient, error) {
+       cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
                cl, err := arvadosclient.MakeArvadosClient()
                if err != nil {
-                       return nil, err
+                       return nil, nil, nil, err
                }
                cl.ApiToken = token
-               return cl, nil
+               kc, err := keepclient.MakeKeepClient(cl)
+               if err != nil {
+                       return nil, nil, nil, err
+               }
+               c2 := arvados.NewClientFromEnv()
+               c2.AuthToken = token
+               return cl, kc, c2, nil
        }
        var err error
-       cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.client, cr.Kc)
+       cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.dispatcherClient, cr.DispatcherKeepClient)
        if err != nil {
                return nil, err
        }
@@ -1704,7 +1720,7 @@ func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClie
        cr.CrunchLog = NewThrottledLogger(w)
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
 
-       loadLogThrottleParams(api)
+       loadLogThrottleParams(dispatcherArvClient)
        go cr.updateLogs()
 
        return cr, nil
@@ -1730,7 +1746,7 @@ func main() {
        `)
        memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
        getVersion := flag.Bool("version", false, "Print version information and exit.")
-       checkContainerd := flag.Duration("check-containerd", 60*time.Second, "Periodic check if (docker-)containerd is running (use 0s to disable).")
+       flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
 
        detached := false
        if len(os.Args) > 1 && os.Args[1] == "-detached" {
@@ -1743,6 +1759,7 @@ func main() {
                os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
                detached = true
        }
+
        flag.Parse()
 
        switch {
@@ -1808,7 +1825,6 @@ func main() {
        cr.expectCgroupParent = *cgroupParent
        cr.enableNetwork = *enableNetwork
        cr.networkMode = *networkMode
-       cr.checkContainerd = *checkContainerd
        if *cgroupParentSubsystem != "" {
                p := findCgroup(*cgroupParentSubsystem)
                cr.setCgroupParent = p
index 217d4236ba6728619f20e6158589c9129d145e68..3fdd440e3e9168e2ed3b4c1e7234b3e93eb2f50c 100644 (file)
@@ -47,6 +47,7 @@ var _ = Suite(&TestSuite{})
 type TestSuite struct {
        client *arvados.Client
        docker *TestDockerClient
+       runner *ContainerRunner
 }
 
 func (s *TestSuite) SetUpTest(c *C) {
@@ -103,6 +104,7 @@ type TestDockerClient struct {
        api         *ArvTestClient
        realTemp    string
        calledWait  bool
+       ctrExited   bool
 }
 
 func NewTestDockerClient() *TestDockerClient {
@@ -176,6 +178,17 @@ func (t *TestDockerClient) ContainerWait(ctx context.Context, container string,
        return body, err
 }
 
+func (t *TestDockerClient) ContainerInspect(ctx context.Context, id string) (c dockertypes.ContainerJSON, err error) {
+       c.ContainerJSONBase = &dockertypes.ContainerJSONBase{}
+       c.ID = "abcde"
+       if t.ctrExited {
+               c.State = &dockertypes.ContainerState{Status: "exited", Dead: true}
+       } else {
+               c.State = &dockertypes.ContainerState{Status: "running", Pid: 1234, Running: true}
+       }
+       return
+}
+
 func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
        if t.exitCode == 2 {
                return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
@@ -362,6 +375,10 @@ call:
        return nil
 }
 
+func (client *KeepTestClient) LocalLocator(locator string) (string, error) {
+       return locator, nil
+}
+
 func (client *KeepTestClient) PutB(buf []byte) (string, int, error) {
        client.Content = buf
        return fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf)), len(buf), nil
@@ -425,10 +442,14 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
 }
 
 func (s *TestSuite) TestLoadImage(c *C) {
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{},
+               &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
+
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       c.Assert(err, IsNil)
+       cr.ContainerArvClient = &ArvTestClient{}
+       cr.ContainerKeepClient = kc
 
        _, err = cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
        c.Check(err, IsNil)
@@ -475,6 +496,9 @@ func (ArvErrorTestClient) Create(resourceType string,
 }
 
 func (ArvErrorTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
+       if method == "GET" && resourceType == "containers" && action == "auth" {
+               return nil
+       }
        return errors.New("ArvError")
 }
 
@@ -507,6 +531,10 @@ func (*KeepErrorTestClient) PutB(buf []byte) (string, int, error) {
        return "", 0, errors.New("KeepError")
 }
 
+func (*KeepErrorTestClient) LocalLocator(string) (string, error) {
+       return "", errors.New("KeepError")
+}
+
 type KeepReadErrorTestClient struct {
        KeepTestClient
 }
@@ -535,8 +563,12 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
        // (1) Arvados error
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, &ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
+
+       cr.ContainerArvClient = &ArvErrorTestClient{}
+       cr.ContainerKeepClient = &KeepTestClient{}
+
        cr.Container.ContainerImage = hwPDH
 
        err = cr.LoadImage()
@@ -545,8 +577,13 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
        // (2) Keep error
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       kc := &KeepErrorTestClient{}
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
+
+       cr.ContainerArvClient = &ArvTestClient{}
+       cr.ContainerKeepClient = &KeepErrorTestClient{}
+
        cr.Container.ContainerImage = hwPDH
 
        err = cr.LoadImage()
@@ -556,19 +593,26 @@ func (s *TestSuite) TestLoadImageKeepError(c *C) {
 
 func (s *TestSuite) TestLoadImageCollectionError(c *C) {
        // (3) Collection doesn't contain image
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       kc := &KeepReadErrorTestClient{}
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        cr.Container.ContainerImage = otherPDH
 
+       cr.ContainerArvClient = &ArvTestClient{}
+       cr.ContainerKeepClient = &KeepReadErrorTestClient{}
+
        err = cr.LoadImage()
        c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar")
 }
 
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
        // (4) Collection doesn't contain image
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       kc := &KeepReadErrorTestClient{}
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        cr.Container.ContainerImage = hwPDH
+       cr.ContainerArvClient = &ArvTestClient{}
+       cr.ContainerKeepClient = &KeepReadErrorTestClient{}
 
        err = cr.LoadImage()
        c.Check(err, NotNil)
@@ -616,6 +660,9 @@ func (s *TestSuite) TestRunContainer(c *C) {
        cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
 
+       cr.ContainerArvClient = &ArvTestClient{}
+       cr.ContainerKeepClient = &KeepTestClient{}
+
        var logs TestLogs
        cr.NewLogWriter = logs.NewTestLoggingWriter
        cr.Container.ContainerImage = hwPDH
@@ -736,7 +783,9 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
        defer kc.Close()
        cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
+       s.runner = cr
        cr.statInterval = 100 * time.Millisecond
+       cr.containerWatchdogInterval = time.Second
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
 
@@ -757,8 +806,8 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
                }
                return d, err
        }
-       cr.MkArvClient = func(token string) (IArvadosClient, error) {
-               return &ArvTestClient{secretMounts: secretMounts}, nil
+       cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
+               return &ArvTestClient{secretMounts: secretMounts}, &KeepTestClient{}, nil, nil
        }
 
        if extraMounts != nil && len(extraMounts) > 0 {
@@ -776,7 +825,15 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
        }
        if exitCode != 2 {
                c.Check(api.WasSetRunning, Equals, true)
-               c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+               var lastupdate arvadosclient.Dict
+               for _, content := range api.Content {
+                       if content["container"] != nil {
+                               lastupdate = content["container"].(arvadosclient.Dict)
+                       }
+               }
+               if lastupdate["log"] == nil {
+                       c.Errorf("no container update with non-nil log -- updates were: %v", api.Content)
+               }
        }
 
        if err != nil {
@@ -830,6 +887,24 @@ func (s *TestSuite) TestRunTimeExceeded(c *C) {
        c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
 }
 
+func (s *TestSuite) TestContainerWaitFails(c *C) {
+       api, _, _ := s.fullRunHelper(c, `{
+    "command": ["sleep", "3"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1
+}`, nil, 0, func(t *TestDockerClient) {
+               t.ctrExited = true
+               time.Sleep(10 * time.Second)
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*")
+}
+
 func (s *TestSuite) TestCrunchstat(c *C) {
        api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
@@ -1036,8 +1111,8 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
        cr, err := NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
-       cr.MkArvClient = func(token string) (IArvadosClient, error) {
-               return &ArvTestClient{}, nil
+       cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
+               return &ArvTestClient{}, &KeepTestClient{}, nil, nil
        }
        setup(cr)
 
@@ -1109,6 +1184,8 @@ func (s *TestSuite) TestSetupMounts(c *C) {
        c.Assert(err, IsNil)
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
+       cr.ContainerArvClient = &ArvTestClient{}
+       cr.ContainerKeepClient = &KeepTestClient{}
 
        realTemp, err := ioutil.TempDir("", "crunchrun_test1-")
        c.Assert(err, IsNil)
@@ -1494,14 +1571,14 @@ func (s *TestSuite) TestStdout(c *C) {
                "runtime_constraints": {}
        }`
 
-       api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+       api, cr, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
+       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
 }
 
 // Used by the TestStdoutWithWrongPath*()
@@ -1520,8 +1597,8 @@ func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDoc
        c.Assert(err, IsNil)
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
-       cr.MkArvClient = func(token string) (IArvadosClient, error) {
-               return &ArvTestClient{}, nil
+       cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
+               return &ArvTestClient{}, &KeepTestClient{}, nil, nil
        }
 
        err = cr.Run()
@@ -1624,14 +1701,14 @@ func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C
 
        extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
 
-       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, cr, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
+       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
 }
 
 func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
@@ -1831,7 +1908,7 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
 }
 
 func (s *TestSuite) TestStderrMount(c *C) {
-       api, _, _ := s.fullRunHelper(c, `{
+       api, cr, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo hello;exit 1"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1853,7 +1930,7 @@ func (s *TestSuite) TestStderrMount(c *C) {
        c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
        c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
 
-       c.Check(api.CalledWith("collection.manifest_text", "./a b1946ac92492d2347c6235b4d2611184+6 0:6:out.txt\n./b 38af5c54926b620264ab1501150cf189+5 0:5:err.txt\n"), NotNil)
+       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a b1946ac92492d2347c6235b4d2611184+6 0:6:out.txt\n./b 38af5c54926b620264ab1501150cf189+5 0:5:err.txt\n"), NotNil)
 }
 
 func (s *TestSuite) TestNumberRoundTrip(c *C) {
@@ -2032,7 +2109,7 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
                "runtime_constraints": {}
        }`
 
-       api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+       api, cr, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
                content, err := ioutil.ReadFile(t.realTemp + "/tmp2/secret.conf")
                c.Check(err, IsNil)
                c.Check(content, DeepEquals, []byte("mypassword"))
@@ -2041,8 +2118,8 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(api.CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), NotNil)
-       c.Check(api.CalledWith("collection.manifest_text", ""), IsNil)
+       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), NotNil)
+       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), IsNil)
 
        // under secret mounts, not captured in output
        helperRecord = `{
@@ -2060,7 +2137,7 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
                "runtime_constraints": {}
        }`
 
-       api, _, _ = s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+       api, cr, _ = s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
                content, err := ioutil.ReadFile(t.realTemp + "/tmp2/secret.conf")
                c.Check(err, IsNil)
                c.Check(content, DeepEquals, []byte("mypassword"))
@@ -2069,8 +2146,8 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(api.CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), IsNil)
-       c.Check(api.CalledWith("collection.manifest_text", ""), NotNil)
+       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), IsNil)
+       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), NotNil)
 }
 
 type FakeProcess struct {
@@ -2080,41 +2157,3 @@ type FakeProcess struct {
 func (fp FakeProcess) CmdlineSlice() ([]string, error) {
        return fp.cmdLine, nil
 }
-
-func (s *TestSuite) helpCheckContainerd(c *C, lp func() ([]PsProcess, error)) error {
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{callraw: true}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       cr.checkContainerd = time.Duration(100 * time.Millisecond)
-       c.Assert(err, IsNil)
-       cr.ListProcesses = lp
-
-       s.docker.fn = func(t *TestDockerClient) {
-               time.Sleep(1 * time.Second)
-               t.logWriter.Close()
-       }
-
-       err = cr.CreateContainer()
-       c.Check(err, IsNil)
-
-       err = cr.StartContainer()
-       c.Check(err, IsNil)
-
-       err = cr.WaitFinish()
-       return err
-
-}
-
-func (s *TestSuite) TestCheckContainerdPresent(c *C) {
-       err := s.helpCheckContainerd(c, func() ([]PsProcess, error) {
-               return []PsProcess{FakeProcess{[]string{"docker-containerd"}}}, nil
-       })
-       c.Check(err, IsNil)
-}
-
-func (s *TestSuite) TestCheckContainerdMissing(c *C) {
-       err := s.helpCheckContainerd(c, func() ([]PsProcess, error) {
-               return []PsProcess{FakeProcess{[]string{"abc"}}}, nil
-       })
-       c.Check(err, ErrorMatches, `'containerd' not found in process list.`)
-}
index 325affe5875108a819b3baa07aa964bcd5ef1224..563871607874f9ad44a07315ce08bfd68274a23b 100644 (file)
@@ -19,6 +19,7 @@ Type=simple
 ExecStart=/usr/bin/keep-balance -commit-pulls -commit-trash
 Restart=always
 RestartSec=10s
+Nice=19
 
 # systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
 StartLimitInterval=0
index 0e2f17c35b85df02b98df4d3e29a974d18deb17d..44d0b0ffefa743dc931eb448bcadce510e5abf92 100644 (file)
@@ -145,6 +145,16 @@ func (s *IntegrationSuite) testCadaver(c *check.C, password string, pathFunc fun
                        cmd:   "move testfile newdir0/\n",
                        match: `(?ms).*Moving .* failed.*`,
                },
+               {
+                       path:  writePath,
+                       cmd:   "lock newdir0/testfile\n",
+                       match: `(?ms).*Locking .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "unlock newdir0/testfile\nasdf\n",
+                       match: `(?ms).*Unlocking .* succeeded.*`,
+               },
                {
                        path:  writePath,
                        cmd:   "ls\n",
@@ -160,6 +170,16 @@ func (s *IntegrationSuite) testCadaver(c *check.C, password string, pathFunc fun
                        cmd:   "mkcol newdir1\n",
                        match: `(?ms).*Creating .* succeeded.*`,
                },
+               {
+                       path:  writePath,
+                       cmd:   "move newdir1/ newdir1x/\n",
+                       match: `(?ms).*Moving .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "move newdir1x newdir1\n",
+                       match: `(?ms).*Moving .* succeeded.*`,
+               },
                {
                        path:  writePath,
                        cmd:   "move newdir0/testfile newdir1/\n",
@@ -243,6 +263,11 @@ func (s *IntegrationSuite) testCadaver(c *check.C, password string, pathFunc fun
                        cmd:   "delete foo\n",
                        match: `(?ms).*Deleting .* failed:.*405 Method Not Allowed.*`,
                },
+               {
+                       path:  pdhPath,
+                       cmd:   "lock foo\n",
+                       match: `(?ms).*Locking .* failed:.*405 Method Not Allowed.*`,
+               },
        } {
                c.Logf("%s %+v", "http://"+s.testServer.Addr, trial)
                if skip != nil && skip(trial.path) {
index 95948e32505f40112cff4da72c88692d7ea6edff..6aeb7b9c48dfb2859841c72f19e2ff78c282e5aa 100644 (file)
@@ -143,20 +143,24 @@ var (
        writeMethod = map[string]bool{
                "COPY":   true,
                "DELETE": true,
+               "LOCK":   true,
                "MKCOL":  true,
                "MOVE":   true,
                "PUT":    true,
                "RMCOL":  true,
+               "UNLOCK": true,
        }
        webdavMethod = map[string]bool{
                "COPY":     true,
                "DELETE":   true,
+               "LOCK":     true,
                "MKCOL":    true,
                "MOVE":     true,
                "OPTIONS":  true,
                "PROPFIND": true,
                "PUT":      true,
                "RMCOL":    true,
+               "UNLOCK":   true,
        }
        browserMethod = map[string]bool{
                "GET":  true,
index 5b23c9c5fa9f10bffec55d48e6950fd0ac76d639..f9b753869a70e17bbf2d5b36f7596a6b0589b287 100644 (file)
@@ -100,6 +100,11 @@ func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
        if !fs.writing {
                return errReadOnly
        }
+       if strings.HasSuffix(oldName, "/") {
+               // WebDAV "MOVE foo/ bar/" means rename foo to bar.
+               oldName = oldName[:len(oldName)-1]
+               newName = strings.TrimSuffix(newName, "/")
+       }
        fs.makeparents(newName)
        return fs.collfs.Rename(oldName, newName)
 }
index 7dea2b66531cd3eb6ebda54e0d4786ea8e99bcea..1f82f3f4fc79d82c8f1a6fa1586410d94c9d76dc 100644 (file)
@@ -66,7 +66,7 @@ func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.R
                        remoteID := part[1:6]
                        remote, ok := cluster.RemoteClusters[remoteID]
                        if !ok {
-                               http.Error(w, "remote cluster not configured", http.StatusBadGateway)
+                               http.Error(w, "remote cluster not configured", http.StatusBadRequest)
                                return
                        }
                        kc, err := rp.remoteClient(remoteID, remote, token)
index f78084dbcf0ce0c9ae5cf2c9f02724217f9ff2bb..fb978fe2ba41fbdf9895c0c718d2ca6c925d5f9c 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "bytes"
        "context"
+       "crypto/sha256"
        "encoding/base64"
        "encoding/hex"
        "flag"
@@ -402,6 +403,14 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                        return err
                }
                opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
+               // In AWS regions that use V4 signatures, we need to
+               // provide ContentSHA256 up front. Otherwise, the S3
+               // library reads the request body (from our buffer)
+               // into another new buffer in order to compute the
+               // SHA256 before sending the request -- which would
+               // mean consuming 128 MiB of memory for the duration
+               // of a 64 MiB write.
+               opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
        }
 
        // Send the block data through a pipe, so that (if we need to)
@@ -501,16 +510,15 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                Bucket:   v.bucket.Bucket,
                Prefix:   prefix,
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
        recentL := s3Lister{
                Bucket:   v.bucket.Bucket,
                Prefix:   "recent/" + prefix,
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
-       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-       for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
-               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+       for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
                if data.Key >= "g" {
                        // Conveniently, "recent/*" and "trash/*" are
                        // lexically greater than all hex-encoded data
@@ -529,15 +537,13 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                stamp := data
 
                // Advance to the corresponding recent/X marker, if any
-               for recent != nil {
+               for recent != nil && recentL.Error() == nil {
                        if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
                                recent = recentL.Next()
-                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                continue
                        } else if cmp == 0 {
                                stamp = recent
                                recent = recentL.Next()
-                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                break
                        } else {
                                // recent/X marker is missing: we'll
@@ -546,13 +552,16 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                                break
                        }
                }
+               if err := recentL.Error(); err != nil {
+                       return err
+               }
                t, err := time.Parse(time.RFC3339, stamp.LastModified)
                if err != nil {
                        return err
                }
                fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
        }
-       return nil
+       return dataL.Error()
 }
 
 // Trash a Keep block.
@@ -624,8 +633,10 @@ func (v *S3Volume) safeCopy(dst, src string) error {
                MetadataDirective: "REPLACE",
        }, v.bucket.Name+"/"+src)
        err = v.translateError(err)
-       if err != nil {
+       if os.IsNotExist(err) {
                return err
+       } else if err != nil {
+               return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
        }
        if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
                return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
@@ -871,6 +882,7 @@ func (v *S3Volume) EmptyTrash() {
                Bucket:   v.bucket.Bucket,
                Prefix:   "trash/",
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
        for trash := trashL.First(); trash != nil; trash = trashL.Next() {
                todo <- trash
@@ -888,6 +900,7 @@ type s3Lister struct {
        Bucket     *s3.Bucket
        Prefix     string
        PageSize   int
+       Stats      *s3bucketStats
        nextMarker string
        buf        []s3.Key
        err        error
@@ -916,6 +929,7 @@ func (lister *s3Lister) Error() error {
 }
 
 func (lister *s3Lister) getPage() {
+       lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
        resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
        lister.nextMarker = ""
        if err != nil {
index cbb831ebc000849c71fedd945a4574ea6f4f1171..8a9fedfb7007ca21ef1d5d2e482ce66464fafe1a 100644 (file)
@@ -51,24 +51,24 @@ func TrashItem(trashRequest TrashRequest) {
        for _, volume := range volumes {
                mtime, err := volume.Mtime(trashRequest.Locator)
                if err != nil {
-                       log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
+                       log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
                        continue
                }
                if trashRequest.BlockMtime != mtime.UnixNano() {
-                       log.Printf("%v Delete(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
+                       log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
                        continue
                }
 
                if !theConfig.EnableDelete {
-                       err = errors.New("did not delete block because EnableDelete is false")
+                       err = errors.New("skipping because EnableDelete is false")
                } else {
                        err = volume.Trash(trashRequest.Locator)
                }
 
                if err != nil {
-                       log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
+                       log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
                } else {
-                       log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
+                       log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)
                }
        }
 }
index a26c396e2e1eb48d35407ab3cc561892c3dd0980..69fc2cedee7c41b6a637e1b7e1f920cb8c17e244 100755 (executable)
@@ -75,6 +75,10 @@ gethost() {
     fi
 }
 
+getclusterid() {
+    docker exec $ARVBOX_CONTAINER cat /var/lib/arvados/api_uuid_prefix
+}
+
 updateconf() {
     if test -f ~/.config/arvados/$ARVBOX_CONTAINER.conf ; then
         sed "s/ARVADOS_API_HOST=.*/ARVADOS_API_HOST=$(gethost):8000/" <$HOME/.config/arvados/$ARVBOX_CONTAINER.conf >$HOME/.config/arvados/$ARVBOX_CONTAINER.conf.tmp
@@ -382,8 +386,9 @@ case "$subcmd" in
         ;;
 
     status)
-        echo "Selected: $ARVBOX_CONTAINER"
+        echo "Container: $ARVBOX_CONTAINER"
         if docker ps -a --filter "status=running" | grep -E "$ARVBOX_CONTAINER$" -q ; then
+           echo "Cluster id: $(getclusterid)"
             echo "Status: running"
             echo "Container IP: $(getip)"
             echo "Published host: $(gethost)"
@@ -453,11 +458,11 @@ case "$subcmd" in
 
     sv)
         if test -n "$1" -a -n "$2" ; then
-            exec docker exec -ti $ARVBOX_CONTAINER sv "$@"
+            exec docker exec $ARVBOX_CONTAINER sv "$@"
         else
             echo "Usage: $0 $subcmd <start|stop|restart> <service>"
             echo "Available services:"
-            exec docker exec -ti $ARVBOX_CONTAINER ls /etc/service
+            exec docker execa $ARVBOX_CONTAINER ls /etc/service
         fi
         ;;
 
index aee25beabdf98e6ccd86bbb816e49ec19d41f781..ec296d21d1c25a92c14639c36658c4b0aa10bce5 100644 (file)
@@ -3,24 +3,24 @@
        "ignore": "test",
        "package": [
                {
-                       "checksumSHA1": "jf7K+UTQNIzRdlG5F4zX/8b++/E=",
+                       "checksumSHA1": "j4je0EzPGzjb6INLY1BHZ+hyMjc=",
                        "origin": "github.com/curoverse/goamz/aws",
                        "path": "github.com/AdRoll/goamz/aws",
-                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revision": "888b4804f2653cd35ebcc95f046079e63b5b2799",
                        "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
-                       "checksumSHA1": "9nUwQXI+pNxZo6bnR7NslpMpfPI=",
+                       "checksumSHA1": "0+n3cT6e7sQCCbBAH8zg6neiHTk=",
                        "origin": "github.com/curoverse/goamz/s3",
                        "path": "github.com/AdRoll/goamz/s3",
-                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revision": "888b4804f2653cd35ebcc95f046079e63b5b2799",
                        "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
                        "checksumSHA1": "tvxbsTkdjB0C/uxEglqD6JfVnMg=",
                        "origin": "github.com/curoverse/goamz/s3/s3test",
                        "path": "github.com/AdRoll/goamz/s3/s3test",
-                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revision": "888b4804f2653cd35ebcc95f046079e63b5b2799",
                        "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
                        "revision": "d682213848ed68c0a260ca37d6dd5ace8423f5ba",
                        "revisionTime": "2017-12-05T20:32:29Z"
                },
-               {
-                       "checksumSHA1": "st4vb0GmDeoKbsfxdpNZ2MPl76M=",
-                       "path": "github.com/StackExchange/wmi",
-                       "revision": "cdffdb33acae0e14efff2628f9bae377b597840e",
-                       "revisionTime": "2018-04-12T20:51:11Z"
-               },
                {
                        "checksumSHA1": "spyv5/YFBjYyZLZa1U2LBfDR8PM=",
                        "path": "github.com/beorn7/perks/quantile",
                        "revision": "0ca9ea5df5451ffdf184b4428c902747c2c11cd7",
                        "revisionTime": "2017-03-27T23:54:44Z"
                },
-               {
-                       "checksumSHA1": "Kqv7bA4oJG0nPwQvGWDwGGaKONo=",
-                       "path": "github.com/go-ole/go-ole",
-                       "revision": "7a0fa49edf48165190530c675167e2f319a05268",
-                       "revisionTime": "2018-06-25T08:58:08Z"
-               },
-               {
-                       "checksumSHA1": "PArleDBtadu2qO4hJwHR8a3IOTA=",
-                       "path": "github.com/go-ole/go-ole/oleutil",
-                       "revision": "7a0fa49edf48165190530c675167e2f319a05268",
-                       "revisionTime": "2018-06-25T08:58:08Z"
-               },
                {
                        "checksumSHA1": "8UEp6v0Dczw/SlasE0DivB0mAHA=",
                        "path": "github.com/gogo/protobuf/jsonpb",
                        "revision": "d14ea06fba99483203c19d92cfcd13ebe73135f4",
                        "revisionTime": "2015-07-11T00:45:18Z"
                },
+               {
+                       "checksumSHA1": "khL6oKjx81rAZKW+36050b7f5As=",
+                       "path": "github.com/jmcvetta/randutil",
+                       "revision": "2bb1b664bcff821e02b2a0644cd29c7e824d54f8",
+                       "revisionTime": "2015-08-17T12:26:01Z"
+               },
                {
                        "checksumSHA1": "oX6jFQD74oOApvDIhOzW2dXpg5Q=",
                        "path": "github.com/kevinburke/ssh_config",
                        "revision": "1744e2970ca51c86172c8190fadad617561ed6e7",
                        "revisionTime": "2017-11-10T11:01:46Z"
                },
-               {
-                       "checksumSHA1": "q14d3C3xvWevU3dSv4P5K0+OSD0=",
-                       "path": "github.com/shirou/gopsutil/cpu",
-                       "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
-                       "revisionTime": "2018-07-05T13:28:12Z"
-               },
-               {
-                       "checksumSHA1": "LZ9GloiGLTISmQ4dalK2XspH6Wo=",
-                       "path": "github.com/shirou/gopsutil/host",
-                       "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
-                       "revisionTime": "2018-07-05T13:28:12Z"
-               },
-               {
-                       "checksumSHA1": "cyoqI0gryzjxGTkaAfyUqMiuUR0=",
-                       "path": "github.com/shirou/gopsutil/internal/common",
-                       "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
-                       "revisionTime": "2018-07-05T13:28:12Z"
-               },
-               {
-                       "checksumSHA1": "vEQLjAO5T5K9zXblEMYdoaBZzj0=",
-                       "path": "github.com/shirou/gopsutil/mem",
-                       "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
-                       "revisionTime": "2018-07-05T13:28:12Z"
-               },
-               {
-                       "checksumSHA1": "KMWFRa0DVpabo9d8euB4RYjUBQE=",
-                       "path": "github.com/shirou/gopsutil/net",
-                       "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
-                       "revisionTime": "2018-07-05T13:28:12Z"
-               },
-               {
-                       "checksumSHA1": "fbO7c1gv1kSvWKOb/+5HUWFkBaA=",
-                       "path": "github.com/shirou/gopsutil/process",
-                       "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
-                       "revisionTime": "2018-07-05T13:28:12Z"
-               },
-               {
-                       "checksumSHA1": "Nve7SpDmjsv6+rhkXAkfg/UQx94=",
-                       "path": "github.com/shirou/w32",
-                       "revision": "bb4de0191aa41b5507caa14b0650cdbddcd9280b",
-                       "revisionTime": "2016-09-30T03:27:40Z"
-               },
                {
                        "checksumSHA1": "8QeSG127zQqbA+YfkO1WkKx/iUI=",
                        "path": "github.com/src-d/gcfg",