railties (>= 4)
request_store (~> 1.0)
logstash-event (1.2.02)
- loofah (2.2.2)
+ loofah (2.2.3)
crass (~> 1.0.2)
nokogiri (>= 1.5.9)
mail (2.7.0)
net-ssh (4.2.0)
net-ssh-gateway (2.0.0)
net-ssh (>= 4.0.0)
- nokogiri (1.8.2)
+ nokogiri (1.8.5)
mini_portile2 (~> 2.3.0)
npm-rails (0.2.1)
rails (>= 3.2)
# Install RVM
ADD generated/rvm.asc /tmp/
-RUN gpg --import /tmp/rvm.asc && \
+RUN gpg --no-tty --import /tmp/rvm.asc && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
/usr/local/rvm/bin/rvm alias create default ruby-2.3 && \
# Install RVM
ADD generated/rvm.asc /tmp/
-RUN gpg --import /tmp/rvm.asc && \
+RUN gpg --no-tty --import /tmp/rvm.asc && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
/usr/local/rvm/bin/rvm alias create default ruby-2.3
# before trying "go test". Otherwise, coverage-reporting
# mode makes Go show the wrong line numbers when reporting
# compilation errors.
- go get -t "git.curoverse.com/arvados.git/$1" && \
+ go get -ldflags "-X main.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1" && \
cd "$GOPATH/src/git.curoverse.com/arvados.git/$1" && \
[[ -z "$(gofmt -e -d . | tee /dev/stderr)" ]] && \
if [[ -n "${testargs[$1]}" ]]
timer_reset
if [[ "$2" == "go" ]]
then
- go get -t "git.curoverse.com/arvados.git/$1"
+ go get -ldflags "-X main.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1"
elif [[ "$2" == "pip" ]]
then
# $3 can name a path directory for us to use, including trailing
- admin/spot-instances.html.textile.liquid
- Other:
- admin/collection-versioning.html.textile.liquid
+ - admin/federation.html.textile.liquid
installguide:
- Overview:
- install/index.html.textile.liquid
--- /dev/null
+---
+layout: default
+navsection: admin
+title: Configuring federation
+...
+
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+This page describes how to enable and configure federation capabilities between clusters.
+
+An overview on how this feature works is discussed in the "architecture section":{{site.baseurl}}/architecture/federation.html
+
+h3. API Server configuration
+
+To accept users from remote clusters, some settings need to be added to the @application.yml@ file. There are two ways in which a remote cluster can be identified: either explictly by listing its prefix-to-hostname mapping, or implicitly by assuming the given remote cluster is public and belongs to the @.arvadosapi.com@ subdomain.
+
+For example, if you want to set up a private cluster federation, the following configuration will only allow access to users from @clsr2@ & @clsr3@:
+
+<pre>
+production:
+ remote_hosts:
+ clsr2: api.cluster2.com
+ clsr3: api.cluster3.com
+ remote_hosts_via_dns: false
+ auto_activate_users_from: []
+</pre>
+
+The additional @auto_activate_users_from@ setting can be used to allow users from the clusters in the federation to not only read but also create & update objects on the local cluster. This feature is covered in more detail in the "user activation section":{{site.baseurl}}/admin/activation.html. In the current example, only manually activated remote users would have full access to the local cluster.
+
+h3. Arvados controller & keepstores configuration
+
+Both @arvados-controller@ and @keepstore@ services also need to be configured, as they proxy requests to remote clusters when needed.
+
+Continuing the previous example, the necessary settings should be added to the @/etc/arvados/config.yml@ file as follows:
+
+<pre>
+Clusters:
+ clsr1:
+ RemoteClusters:
+ clsr2:
+ Host: api.cluster2.com
+ Proxy: true
+ clsr3:
+ Host: api.cluster3.com
+ Proxy: true
+</pre>
+
+Similar settings should be added to @clsr2@ & @clsr3@ hosts, so that all clusters in the federation can talk to each other.
+
+h3. Testing
+
+Following the above example, let's suppose @clsr1@ is our "home cluster", that is to say, we use our @clsr1@ user account as our federated identity and both @clsr2@ and @clsr3@ remote clusters are set up to allow users from @clsr1@ and to auto-activate them. The first thing to do would be to log into a remote workbench using the local user token. This can be done following these steps:
+
+1. Log into the local workbench and get the user token
+2. Visit the remote workbench specifying the local user token by URL: @https://workbench.cluster2.com?api_token=token_from_clsr1@
+3. You should now be logged into @clsr2@ with your account from @clsr1@
+
+To further test the federation setup, you can create a collection on @clsr2@, uploading some files and copying its UUID. Next, logged into a shell node on your home cluster you should be able to get that collection by running:
+
+<pre>
+user@clsr1:~$ arv collection get --uuid clsr2-xvhdp-xxxxxxxxxxxxxxx
+</pre>
+
+The returned collection metadata should show the local user's uuid on the @owner_uuid@ field. This tests that the @arvados-controller@ service is proxying requests correctly.
+
+One last test may be performed, to confirm that the @keepstore@ services also recognize remote cluster prefixes and proxy the requests. You can ask for the previously created collection using any of the usual tools, for example:
+
+<pre>
+user@clsr1:~$ arv-get clsr2-xvhdp-xxxxxxxxxxxxxxx/uploaded_file .
+</pre>
TODO: extract this information based on git commit messages and generate changelogs / release notes automatically.
{% endcomment %}
-h3. 2018-07-31: "#13497":https://dev.arvados.org/issues/13497 "db5107dca":https://dev.arvados.org/projects/arvados/repository/revisions/db5107dca adds a new system service, arvados-controller
-* "Install the controller":../install/install-controller.html after upgrading your system.
-* Verify your setup by confirming that API calls appear in the controller's logs (_e.g._, @journalctl -fu arvados-controller@) while loading a workbench page.
+h3. v1.2.0 (2018-09-05)
-h3. 2018-04-05: v1.1.4 regression in arvados-cwl-runner for workflows that rely on implicit discovery of secondaryFiles
+h4. Regenerate Postgres table statistics
-h4. Secondary files missing from toplevel workflow inputs
+It is recommended to regenerate the table statistics for Postgres after upgrading to v1.2.0. If autovacuum is enabled on your installation, this script would do the trick:
+
+<pre>
+#!/bin/bash
+
+set -e
+set -u
+
+tables=`echo "\dt" | psql arvados_production | grep public|awk -e '{print $3}'`
+
+for t in $tables; do
+ echo "echo 'analyze $t' | psql arvados_production"
+ time echo "analyze $t" | psql arvados_production
+done
+</pre>
+
+If you also need to do the vacuum, you could adapt the script to run 'vacuum analyze' instead of 'analyze'.
+
+h4. New component: arvados-controller
+
+Commit "db5107dca":https://dev.arvados.org/projects/arvados/repository/revisions/db5107dca adds a new system service, arvados-controller. More detail is available in story "#13496":https://dev.arvados.org/issues/13497.
+
+To add the Arvados Controller to your system please refer to the "installation instructions":../install/install-controller.html after upgrading your system to 1.2.0.
+
+Verify your setup by confirming that API calls appear in the controller's logs (_e.g._, @journalctl -fu arvados-controller@) while loading a workbench page.
+
+h3. v1.1.4 (2018-04-10)
+
+h4. arvados-cwl-runner regressions (2018-04-05)
+
+<strong>Secondary files missing from toplevel workflow inputs</strong>
+
+This only affects workflows that rely on implicit discovery of secondaryFiles.
If a workflow input does not declare @secondaryFiles@ corresponding to the @secondaryFiles@ of workflow steps which use the input, the workflow would inconsistently succeed or fail depending on whether the input values were specified as local files or referenced an existing collection (and whether the existing collection contained the secondary files or not). To ensure consistent behavior, the workflow is now required to declare in the top level workflow inputs any secondaryFiles that are expected by workflow steps.
</code></pre>
</notextile>
-h4. Secondary files on default file inputs
+This bug has been fixed in Arvados release v1.2.0.
+
+<strong>Secondary files on default file inputs</strong>
-Due to a bug in Arvados v1.1.4, @File@ inputs that have default values and also expect @secondaryFiles@ and will fail to upload default @secondaryFiles@. As an example, the following case will fail:
+@File@ inputs that have default values and also expect @secondaryFiles@ and will fail to upload default @secondaryFiles@. As an example, the following case will fail:
<pre>
class: CommandLineTool
</code></pre>
</notextile>
-This bug will be fixed in an upcoming release of Arvados.
+This bug has been fixed in Arvados release v1.2.0.
+
+h3. v1.1.3 (2018-02-08)
+
+There are no special upgrade notes for this release.
+
+h3. v1.1.2 (2017-12-22)
+
+h4. The minimum version for Postgres is now 9.4 (2017-12-08)
+
+As part of story "#11908":https://dev.arvados.org/issues/11908, commit "8f987a9271":https://dev.arvados.org/projects/arvados/repository/revisions/8f987a9271 introduces a dependency on Postgres 9.4. Previously, Arvados required Postgres 9.3.
-h3. 2017-12-08: "#11908":https://dev.arvados.org/issues/11908 "8f987a9271":https://dev.arvados.org/projects/arvados/repository/revisions/8f987a9271 now requires minimum of Postgres 9.4 (previously 9.3)
* Debian 8 (pg 9.4) and Debian 9 (pg 9.6) do not require an upgrade
* Ubuntu 16.04 (pg 9.5) does not require an upgrade
* Ubuntu 14.04 (pg 9.3) requires upgrade to Postgres 9.4: https://www.postgresql.org/download/linux/ubuntu/
*# Install the @rh-postgresql94@ backport package from either Software Collections: http://doc.arvados.org/install/install-postgresql.html or the Postgres developers: https://www.postgresql.org/download/linux/redhat/
*# Restore from the backup using @psql@
-h3. 2017-09-25: "#12032":https://dev.arvados.org/issues/12032 "68bdf4cbb":https://dev.arvados.org/projects/arvados/repository/revisions/68bdf4cbb now requires minimum of Postgres 9.3 (previously 9.1)
+h3. v1.1.1 (2017-11-30)
+
+There are no special upgrade notes for this release.
+
+h3. v1.1.0 (2017-10-24)
+
+h4. The minimum version for Postgres is now 9.3 (2017-09-25)
+
+As part of story "#12032":https://dev.arvados.org/issues/12032, commit "68bdf4cbb1":https://dev.arvados.org/projects/arvados/repository/revisions/68bdf4cbb1 introduces a dependency on Postgres 9.3. Previously, Arvados required Postgres 9.1.
+
* Debian 8 (pg 9.4) and Debian 9 (pg 9.6) do not require an upgrade
* Ubuntu 16.04 (pg 9.5) does not require an upgrade
* Ubuntu 14.04 (pg 9.3) is compatible, however upgrading to Postgres 9.4 is recommended: https://www.postgresql.org/download/linux/ubuntu/
*# Install the @rh-postgresql94@ backport package from either Software Collections: http://doc.arvados.org/install/install-postgresql.html or the Postgres developers: https://www.postgresql.org/download/linux/redhat/
*# Restore from the backup using @psql@
-h3. 2017-06-30: "#11807":https://dev.arvados.org/issues/11807 "55aafbb":https://dev.arvados.org/projects/arvados/repository/revisions/55aafbb converts old "jobs" database records from YAML to JSON, making the upgrade process slower than usual.
+h3. Older versions
+
+h4. Upgrade slower than usual (2017-06-30)
+
+As part of story "#11807":https://dev.arvados.org/issues/11807, commit "55aafbb":https://dev.arvados.org/projects/arvados/repository/revisions/55aafbb converts old "jobs" database records from YAML to JSON, making the upgrade process slower than usual.
+
* The migration can take some time if your database contains a substantial number of YAML-serialized rows (i.e., you installed Arvados before March 3, 2017 "660a614":https://dev.arvados.org/projects/arvados/repository/revisions/660a614 and used the jobs/pipelines APIs). Otherwise, the upgrade will be no slower than usual.
* The conversion runs as a database migration, i.e., during the deb/rpm package upgrade process, while your API server is unavailable.
* Expect it to take about 1 minute per 20K jobs that have ever been created/run.
-h3. 2017-06-05: "#9005":https://dev.arvados.org/issues/9005 "cb230b0":https://dev.arvados.org/projects/arvados/repository/revisions/cb230b0 reduces service discovery overhead in keep-web requests.
+h4. Service discovery overhead change in keep-web (2017-06-05)
+
+As part of story "#9005":https://dev.arvados.org/issues/9005, commit "cb230b0":https://dev.arvados.org/projects/arvados/repository/revisions/cb230b0 reduces service discovery overhead in keep-web requests.
+
* When upgrading keep-web _or keepproxy_ to/past this version, make sure to update API server as well. Otherwise, a bad token in a request can cause keep-web to fail future requests until either keep-web restarts or API server gets upgraded.
-h3. 2017-04-12: "#11349":https://dev.arvados.org/issues/11349 "2c094e2":https://dev.arvados.org/projects/arvados/repository/revisions/2c094e2 adds a "management" http server to nodemanager.
+h4. Node manager now has an http endpoint for management (2017-04-12)
+
+As part of story "#11349":https://dev.arvados.org/issues/11349, commit "2c094e2":https://dev.arvados.org/projects/arvados/repository/revisions/2c094e2 adds a "management" http server to nodemanager.
+
* To enable it, add to your configuration file: <pre>[Manage]
address = 127.0.0.1
port = 8989</pre> (see example configuration files in source:services/nodemanager/doc or https://doc.arvados.org/install/install-nodemanager.html for more info)
* The server responds to @http://{address}:{port}/status.json@ with a summary of how many nodes are in each state (booting, busy, shutdown, etc.)
-h3. 2017-03-23: "#10766":https://dev.arvados.org/issues/10766 "e8cc0d7":https://dev.arvados.org/projects/arvados/repository/revisions/e8cc0d7 replaces puma with arvados-ws as the recommended websocket server.
+h4. New websockets component (2017-03-23)
+
+As part of story "#10766":https://dev.arvados.org/issues/10766, commit "e8cc0d7":https://dev.arvados.org/projects/arvados/repository/revisions/e8cc0d7 replaces puma with arvados-ws as the recommended websocket server.
* See http://doc.arvados.org/install/install-ws.html for install/upgrade instructions.
* Remove the old puma server after the upgrade is complete. Example, with runit: <pre>
$ sudo sv down /etc/sv/puma
$ systemctl stop puma
</pre>
-h3. 2017-03-06: "#11168":https://dev.arvados.org/issues/11168 "660a614":https://dev.arvados.org/projects/arvados/repository/revisions/660a614 uses JSON instead of YAML to encode hashes and arrays in the database.
+h4. Change of database encoding for hashes and arrays (2017-03-06)
+
+As part of story "#11168":https://dev.arvados.org/issues/11168, commit "660a614":https://dev.arvados.org/projects/arvados/repository/revisions/660a614 uses JSON instead of YAML to encode hashes and arrays in the database.
+
* Aside from a slight performance improvement, this should have no externally visible effect.
* Downgrading past this version is not supported, and is likely to cause errors. If this happens, the solution is to upgrade past this version.
* After upgrading, make sure to restart puma and crunch-dispatch-* processes.
-h3. 2017-02-03: "#10969":https://dev.arvados.org/issues/10969 "74a9dec":https://dev.arvados.org/projects/arvados/repository/revisions/74a9dec introduces a Docker image format compatibility check: the @arv keep docker@ command prevents users from inadvertently saving docker images that compute nodes won't be able to run.
+h4. Docker image format compatibility check (2017-02-03)
+
+As part of story "#10969":https://dev.arvados.org/issues/10969, commit "74a9dec":https://dev.arvados.org/projects/arvados/repository/revisions/74a9dec introduces a Docker image format compatibility check: the @arv keep docker@ command prevents users from inadvertently saving docker images that compute nodes won't be able to run.
* If your compute nodes run a version of *docker older than 1.10* you must override the default by adding to your API server configuration (@/etc/arvados/api/application.yml@): <pre><code class="yaml">docker_image_formats: ["v1"]</code></pre>
* Refer to the comments above @docker_image_formats@ in @/var/www/arvados-api/current/config/application.default.yml@ or source:services/api/config/application.default.yml or issue "#10969":https://dev.arvados.org/issues/10969 for more detail.
* *NOTE:* This does *not* include any support for migrating existing Docker images from v1 to v2 format. This will come later: for now, sites running Docker 1.9 or earlier should still *avoid upgrading Docker further than 1.9.*
-h3. 2016-09-27: several Debian and RPM packages -- keep-balance ("d9eec0b":https://dev.arvados.org/projects/arvados/repository/revisions/d9eec0b), keep-web ("3399e63":https://dev.arvados.org/projects/arvados/repository/revisions/3399e63), keepproxy ("6de67b6":https://dev.arvados.org/projects/arvados/repository/revisions/6de67b6), and arvados-git-httpd ("9e27ddf":https://dev.arvados.org/projects/arvados/repository/revisions/9e27ddf) -- now enable their respective components using systemd. These components prefer YAML configuration files over command line flags ("3bbe1cd":https://dev.arvados.org/projects/arvados/repository/revisions/3bbe1cd).
+h4. Debian and RPM packages now have systemd unit files (2016-09-27)
+
+Several Debian and RPM packages -- keep-balance ("d9eec0b":https://dev.arvados.org/projects/arvados/repository/revisions/d9eec0b), keep-web ("3399e63":https://dev.arvados.org/projects/arvados/repository/revisions/3399e63), keepproxy ("6de67b6":https://dev.arvados.org/projects/arvados/repository/revisions/6de67b6), and arvados-git-httpd ("9e27ddf":https://dev.arvados.org/projects/arvados/repository/revisions/9e27ddf) -- now enable their respective components using systemd. These components prefer YAML configuration files over command line flags ("3bbe1cd":https://dev.arvados.org/projects/arvados/repository/revisions/3bbe1cd).
+
* On Debian-based systems using systemd, services are enabled automatically when packages are installed.
* On RedHat-based systems using systemd, unit files are installed but services must be enabled explicitly: e.g., <code>"sudo systemctl enable keep-web; sudo systemctl start keep-web"</code>.
* The new systemd-supervised services will not start up successfully until configuration files are installed in /etc/arvados/: e.g., <code>"Sep 26 18:23:55 62751f5bb946 keep-web[74]: 2016/09/26 18:23:55 open /etc/arvados/keep-web/keep-web.yml: no such file or directory"</code>
** keepproxy - /etc/arvados/keepproxy/keepproxy.yml
** arvados-git-httpd - /etc/arvados/arv-git-httpd/arv-git-httpd.yml
-h3. 2016-05-31: "ae72b172c8":https://dev.arvados.org/projects/arvados/repository/revisions/ae72b172c8 and "3aae316c25":https://dev.arvados.org/projects/arvados/repository/revisions/3aae316c25 install Python modules and scripts to different locations on the filesystem.
+h4. Installation paths for Python modules and script changed (2016-05-31)
+
+Commits "ae72b172c8":https://dev.arvados.org/projects/arvados/repository/revisions/ae72b172c8 and "3aae316c25":https://dev.arvados.org/projects/arvados/repository/revisions/3aae316c25 change the filesystem location where Python modules and scripts are installed.
+
* Previous packages installed these files to the distribution's preferred path under @/usr/local@ (or the equivalent location in a Software Collection). Now they get installed to a path under @/usr@. This improves compatibility with other Python packages provided by the distribution. See "#9242":https://dev.arvados.org/issues/9242 for more background.
* If you simply import Python modules from scripts, or call Python tools relying on $PATH, you don't need to make any changes. If you have hardcoded full paths to some of these files (e.g., in symbolic links or configuration files), you will need to update those paths after this upgrade.
-h3. 2016-04-25: "eebcb5e":https://dev.arvados.org/projects/arvados/repository/revisions/eebcb5e requires the crunchrunner package to be installed on compute nodes and shell nodes in order to run CWL workflows.
+h4. Crunchrunner package is required on compute and shell nodes (2016-04-25)
+
+Commit "eebcb5e":https://dev.arvados.org/projects/arvados/repository/revisions/eebcb5e requires the crunchrunner package to be installed on compute nodes and shell nodes in order to run CWL workflows.
+
* On each Debian-based compute node and shell node, run: @sudo apt-get install crunchrunner@
* On each Red Hat-based compute node and shell node, run: @sudo yum install crunchrunner@
-h3. 2016-04-21: "3c88abd":https://dev.arvados.org/projects/arvados/repository/revisions/3c88abd changes the Keep permission signature algorithm.
+h4. Keep permission signature algorithm change (2016-04-21)
+
+Commit "3c88abd":https://dev.arvados.org/projects/arvados/repository/revisions/3c88abd changes the Keep permission signature algorithm.
+
* All software components that generate signatures must be upgraded together. These are: keepstore, API server, keep-block-check, and keep-rsync. For example, if keepstore < 0.1.20160421183420 but API server >= 0.1.20160421183420, clients will not be able to read or write data in Keep.
* Jobs and client operations that are in progress during the upgrade (including arv-put's "resume cache") will fail.
-h3. 2015-01-05: "e1276d6e":https://dev.arvados.org/projects/arvados/repository/revisions/e1276d6e disables Workbench's "Getting Started" popup by default.
+h4. Workbench's "Getting Started" popup disabled by default (2015-01-05)
+
+Commit "e1276d6e":https://dev.arvados.org/projects/arvados/repository/revisions/e1276d6e disables Workbench's "Getting Started" popup by default.
+
* If you want new users to continue seeing this popup, set @enable_getting_started_popup: true@ in Workbench's @application.yml@ configuration.
-h3. 2015-12-03: "5590c9ac":https://dev.arvados.org/projects/arvados/repository/revisions/5590c9ac makes a Keep-backed writable scratch directory available in crunch jobs (see "#7751":https://dev.arvados.org/issues/7751)
+h4. Crunch jobs now have access to Keep-backed writable scratch storage (2015-12-03)
+
+Commit "5590c9ac":https://dev.arvados.org/projects/arvados/repository/revisions/5590c9ac makes a Keep-backed writable scratch directory available in crunch jobs (see "#7751":https://dev.arvados.org/issues/7751)
+
* All compute nodes must be upgraded to arvados-fuse >= 0.1.2015112518060 because crunch-job uses some new arv-mount flags (--mount-tmp, --mount-by-pdh) introduced in merge "346a558":https://dev.arvados.org/projects/arvados/repository/revisions/346a558
* Jobs will fail if the API server (in particular crunch-job from the arvados-cli gem) is upgraded without upgrading arvados-fuse on compute nodes.
-h3. 2015-11-11: "1e2ace5":https://dev.arvados.org/projects/arvados/repository/revisions/1e2ace5 changes recommended config for keep-web (see "#5824":https://dev.arvados.org/issues/5824)
+h4. Recommended configuration change for keep-web (2015-11-11)
+
+Commit "1e2ace5":https://dev.arvados.org/projects/arvados/repository/revisions/1e2ace5 changes recommended config for keep-web (see "#5824":https://dev.arvados.org/issues/5824)
+
* proxy/dns/ssl config should be updated to route "https://download.uuid_prefix.arvadosapi.com/" requests to keep-web (alongside the existing "collections" routing)
* keep-web command line adds @-attachment-only-host download.uuid_prefix.arvadosapi.com@
* Workbench config adds @keep_web_download_url@
* More info on the (still beta/non-TOC-linked) "keep-web doc page":http://doc.arvados.org/install/install-keep-web.html
-h3. 2015-11-04: "1d1c6de":https://dev.arvados.org/projects/arvados/repository/revisions/1d1c6de removes stopped containers (see "#7444":https://dev.arvados.org/issues/7444)
+h4. Stopped containers are now automatically removed on compute nodes (2015-11-04)
+
+Commit "1d1c6de":https://dev.arvados.org/projects/arvados/repository/revisions/1d1c6de removes stopped containers (see "#7444":https://dev.arvados.org/issues/7444)
+
* arvados-docker-cleaner removes _all_ docker containers as soon as they exit, effectively making @docker run@ default to @--rm@. If you run arvados-docker-cleaner on a host that does anything other than run crunch-jobs, and you still want to be able to use @docker start@, read the "new doc page":http://doc.arvados.org/install/install-compute-node.html to learn how to turn this off before upgrading.
-h3. 2015-11-04: "21006cf":https://dev.arvados.org/projects/arvados/repository/revisions/21006cf adds a keep-web service (see "#5824":https://dev.arvados.org/issues/5824)
-* Nothing relies on it yet, but early adopters can install it now by following http://doc.arvados.org/install/install-keep-web.html (it is not yet linked in the TOC).
+h4. New keep-web service (2015-11-04)
+
+Commit "21006cf":https://dev.arvados.org/projects/arvados/repository/revisions/21006cf adds a new keep-web service (see "#5824":https://dev.arvados.org/issues/5824).
+
+* Nothing relies on keep-web yet, but early adopters can install it now by following http://doc.arvados.org/install/install-keep-web.html (it is not yet linked in the TOC).
_This feature is under development. Support for federation is limited to certain types of requests. The behaviors described here should not be interpreted as a stable API._
+Detailed configuration information is available on the "federation admin section":{{site.baseurl}}/admin/federation.html.
+
h2(#cluster_id). Cluster identifiers
-Clusters are identified by a five-digit alphanumeric id (numbers and lowercase letters). There are 36^5^ = 60466176 possible cluster identifiers.
+Clusters are identified by a five-digit alphanumeric id (numbers and lowercase letters). There are 36 ^5^ = 60466176 possible cluster identifiers.
* For automated tests purposes, use "z****"
* For experimental/local-only/private clusters that won't ever be visible on the public Internet, use "x****"
h3. Authenticating remote users with salted tokens
-When making a request to the home cluster, authorization is established by looking up the API token in the @api_client_authorizations@ table to determine the user identity. When making a request to a remote cluster, we need to provide an API token which can be used to establish the user's identity. The remote cluster will connect back to the home cluster to determine if the token valid and the user it corresponds to. However, we do not want to send along the same API token used for the original request. If the remote cluster is malicious or compromised, sending along user's regular token would compromise the user account on the home cluster. Instead, the controller sends a "salted token". The salted token is restricted to only to fetching the user account and group membership. The salted token consists of the uuid of the token in @api_client_authorizations@ and the SHA1 HMAC of the original token and the cluster id of remote cluster. To verify the token, the remote cluster contacts the home cluster and provides the token uuid, the hash, and its cluster id. The home cluster uses the uuid to look up the token re-computes the SHA1 HMAC of the original token and cluster id. If that hash matches, the the token is valid. To avoid having to re-validate the token on every request, it is cached for a short period.
+When making a request to the home cluster, authorization is established by looking up the API token in the @api_client_authorizations@ table to determine the user identity. When making a request to a remote cluster, we need to provide an API token which can be used to establish the user's identity. The remote cluster will connect back to the home cluster to determine if the token valid and the user it corresponds to. However, we do not want to send along the same API token used for the original request. If the remote cluster is malicious or compromised, sending along user's regular token would compromise the user account on the home cluster. Instead, the controller sends a "salted token". The salted token is restricted to only to fetching the user account and group membership. The salted token consists of the uuid of the token in @api_client_authorizations@ and the SHA1 HMAC of the original token and the cluster id of remote cluster. To verify the token, the remote cluster contacts the home cluster and provides the token uuid, the hash, and its cluster id. The home cluster uses the uuid to look up the token re-computes the SHA1 HMAC of the original token and cluster id. If that hash matches, then the token is valid. To avoid having to re-validate the token on every request, it is cached for a short period.
The security properties of this scheme are:
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "crypto/md5"
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strings"
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+func rewriteSignatures(clusterID string, expectHash string,
+ resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+
+ if requestError != nil {
+ return resp, requestError
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return resp, nil
+ }
+
+ originalBody := resp.Body
+ defer originalBody.Close()
+
+ var col arvados.Collection
+ err = json.NewDecoder(resp.Body).Decode(&col)
+ if err != nil {
+ return nil, err
+ }
+
+ // rewriting signatures will make manifest text 5-10% bigger so calculate
+ // capacity accordingly
+ updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
+
+ hasher := md5.New()
+ mw := io.MultiWriter(hasher, updatedManifest)
+ sz := 0
+
+ scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
+ scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
+ for scanner.Scan() {
+ line := scanner.Text()
+ tokens := strings.Split(line, " ")
+ if len(tokens) < 3 {
+ return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+ }
+
+ n, err := mw.Write([]byte(tokens[0]))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+ for _, token := range tokens[1:] {
+ n, err = mw.Write([]byte(" "))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+
+ m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
+ if m != nil {
+ // Rewrite the block signature to be a remote signature
+ _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+
+ // for hash checking, ignore signatures
+ n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+ } else {
+ n, err = mw.Write([]byte(token))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+ }
+ }
+ n, err = mw.Write([]byte("\n"))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+ }
+
+ // Check that expected hash is consistent with
+ // portable_data_hash field of the returned record
+ if expectHash == "" {
+ expectHash = col.PortableDataHash
+ } else if expectHash != col.PortableDataHash {
+ return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
+ }
+
+ // Certify that the computed hash of the manifest_text matches our expectation
+ sum := hasher.Sum(nil)
+ computedHash := fmt.Sprintf("%x+%v", sum, sz)
+ if computedHash != expectHash {
+ return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
+ }
+
+ col.ManifestText = updatedManifest.String()
+
+ newbody, err := json.Marshal(col)
+ if err != nil {
+ return nil, err
+ }
+
+ buf := bytes.NewBuffer(newbody)
+ resp.Body = ioutil.NopCloser(buf)
+ resp.ContentLength = int64(buf.Len())
+ resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
+
+ return resp, nil
+}
+
+func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+ if requestError != nil {
+ return resp, requestError
+ }
+
+ if resp.StatusCode == http.StatusNotFound {
+ // Suppress returning this result, because we want to
+ // search the federation.
+ return nil, nil
+ }
+ return resp, nil
+}
+
+type searchRemoteClusterForPDH struct {
+ pdh string
+ remoteID string
+ mtx *sync.Mutex
+ sentResponse *bool
+ sharedContext *context.Context
+ cancelFunc func()
+ errors *[]string
+ statusCode *int
+}
+
+func fetchRemoteCollectionByUUID(
+ h *genericFederatedRequestHandler,
+ effectiveMethod string,
+ clusterId *string,
+ uuid string,
+ remainder string,
+ w http.ResponseWriter,
+ req *http.Request) bool {
+
+ if effectiveMethod != "GET" {
+ // Only handle GET requests right now
+ return false
+ }
+
+ if uuid != "" {
+ // Collection UUID GET request
+ *clusterId = uuid[0:5]
+ if *clusterId != "" && *clusterId != h.handler.Cluster.ClusterID {
+ // request for remote collection by uuid
+ resp, err := h.handler.remoteClusterRequest(*clusterId, req)
+ newResponse, err := rewriteSignatures(*clusterId, "", resp, err)
+ h.handler.proxy.ForwardResponse(w, newResponse, err)
+ return true
+ }
+ }
+
+ return false
+}
+
+func fetchRemoteCollectionByPDH(
+ h *genericFederatedRequestHandler,
+ effectiveMethod string,
+ clusterId *string,
+ uuid string,
+ remainder string,
+ w http.ResponseWriter,
+ req *http.Request) bool {
+
+ if effectiveMethod != "GET" {
+ // Only handle GET requests right now
+ return false
+ }
+
+ m := collectionsByPDHRe.FindStringSubmatch(req.URL.Path)
+ if len(m) != 2 {
+ return false
+ }
+
+ // Request for collection by PDH. Search the federation.
+
+ // First, query the local cluster.
+ resp, err := h.handler.localClusterRequest(req)
+ newResp, err := filterLocalClusterResponse(resp, err)
+ if newResp != nil || err != nil {
+ h.handler.proxy.ForwardResponse(w, newResp, err)
+ return true
+ }
+
+ // Create a goroutine for each cluster in the
+ // RemoteClusters map. The first valid result gets
+ // returned to the client. When that happens, all
+ // other outstanding requests are cancelled
+ sharedContext, cancelFunc := context.WithCancel(req.Context())
+ req = req.WithContext(sharedContext)
+ wg := sync.WaitGroup{}
+ pdh := m[1]
+ success := make(chan *http.Response)
+ errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
+
+ // use channel as a semaphore to limit the number of concurrent
+ // requests at a time
+ sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
+
+ defer cancelFunc()
+
+ for remoteID := range h.handler.Cluster.RemoteClusters {
+ if remoteID == h.handler.Cluster.ClusterID {
+ // No need to query local cluster again
+ continue
+ }
+
+ wg.Add(1)
+ go func(remote string) {
+ defer wg.Done()
+ // blocks until it can put a value into the
+ // channel (which has a max queue capacity)
+ sem <- true
+ select {
+ case <-sharedContext.Done():
+ return
+ default:
+ }
+
+ resp, err := h.handler.remoteClusterRequest(remote, req)
+ wasSuccess := false
+ defer func() {
+ if resp != nil && !wasSuccess {
+ resp.Body.Close()
+ }
+ }()
+ if err != nil {
+ errorChan <- err
+ return
+ }
+ if resp.StatusCode != http.StatusOK {
+ errorChan <- HTTPError{resp.Status, resp.StatusCode}
+ return
+ }
+ select {
+ case <-sharedContext.Done():
+ return
+ default:
+ }
+
+ newResponse, err := rewriteSignatures(remote, pdh, resp, nil)
+ if err != nil {
+ errorChan <- err
+ return
+ }
+ select {
+ case <-sharedContext.Done():
+ case success <- newResponse:
+ wasSuccess = true
+ }
+ <-sem
+ }(remoteID)
+ }
+ go func() {
+ wg.Wait()
+ cancelFunc()
+ }()
+
+ errorCode := http.StatusNotFound
+
+ for {
+ select {
+ case newResp = <-success:
+ h.handler.proxy.ForwardResponse(w, newResp, nil)
+ return true
+ case <-sharedContext.Done():
+ var errors []string
+ for len(errorChan) > 0 {
+ err := <-errorChan
+ if httperr, ok := err.(HTTPError); ok {
+ if httperr.Code != http.StatusNotFound {
+ errorCode = http.StatusBadGateway
+ }
+ }
+ errors = append(errors, err.Error())
+ }
+ httpserver.Errors(w, errors, errorCode)
+ return true
+ }
+ }
+
+ // shouldn't ever get here
+ return true
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "strings"
+
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+func remoteContainerRequestCreate(
+ h *genericFederatedRequestHandler,
+ effectiveMethod string,
+ clusterId *string,
+ uuid string,
+ remainder string,
+ w http.ResponseWriter,
+ req *http.Request) bool {
+
+ if effectiveMethod != "POST" || uuid != "" || remainder != "" {
+ return false
+ }
+
+ // First make sure supplied token is valid.
+ creds := auth.NewCredentials()
+ creds.LoadTokensFromHTTPRequest(req)
+
+ currentUser, err := h.handler.validateAPItoken(req, creds.Tokens[0])
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusForbidden)
+ return true
+ }
+
+ if *clusterId == "" {
+ *clusterId = h.handler.Cluster.ClusterID
+ }
+
+ if strings.HasPrefix(currentUser.Authorization.UUID, h.handler.Cluster.ClusterID) &&
+ *clusterId == h.handler.Cluster.ClusterID {
+ // local user submitting container request to local cluster
+ return false
+ }
+
+ if req.Header.Get("Content-Type") != "application/json" {
+ httpserver.Error(w, "Expected Content-Type: application/json, got "+req.Header.Get("Content-Type"), http.StatusBadRequest)
+ return true
+ }
+
+ originalBody := req.Body
+ defer originalBody.Close()
+ var request map[string]interface{}
+ err = json.NewDecoder(req.Body).Decode(&request)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
+
+ crString, ok := request["container_request"].(string)
+ if ok {
+ var crJson map[string]interface{}
+ err := json.Unmarshal([]byte(crString), &crJson)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
+
+ request["container_request"] = crJson
+ }
+
+ containerRequest, ok := request["container_request"].(map[string]interface{})
+ if !ok {
+ // Use toplevel object as the container_request object
+ containerRequest = request
+ }
+
+ // If runtime_token is not set, create a new token
+ if _, ok := containerRequest["runtime_token"]; !ok {
+ if len(currentUser.Authorization.Scopes) != 1 || currentUser.Authorization.Scopes[0] != "all" {
+ httpserver.Error(w, "Token scope is not [all]", http.StatusForbidden)
+ return true
+ }
+
+ if strings.HasPrefix(currentUser.Authorization.UUID, h.handler.Cluster.ClusterID) {
+ // Local user, submitting to a remote cluster.
+ // Create a new time-limited token.
+ newtok, err := h.handler.createAPItoken(req, currentUser.UUID, nil)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusForbidden)
+ return true
+ }
+ containerRequest["runtime_token"] = newtok.TokenV2()
+ } else {
+ // Remote user. Container request will use the
+ // current token, minus the trailing portion
+ // (optional container uuid).
+ sp := strings.Split(creds.Tokens[0], "/")
+ if len(sp) >= 3 {
+ containerRequest["runtime_token"] = strings.Join(sp[0:3], "/")
+ } else {
+ containerRequest["runtime_token"] = creds.Tokens[0]
+ }
+ }
+ }
+
+ newbody, err := json.Marshal(request)
+ buf := bytes.NewBuffer(newbody)
+ req.Body = ioutil.NopCloser(buf)
+ req.ContentLength = int64(buf.Len())
+ req.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
+
+ resp, err := h.handler.remoteClusterRequest(*clusterId, req)
+ h.handler.proxy.ForwardResponse(w, resp, err)
+ return true
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "regexp"
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+type federatedRequestDelegate func(
+ h *genericFederatedRequestHandler,
+ effectiveMethod string,
+ clusterId *string,
+ uuid string,
+ remainder string,
+ w http.ResponseWriter,
+ req *http.Request) bool
+
+type genericFederatedRequestHandler struct {
+ next http.Handler
+ handler *Handler
+ matcher *regexp.Regexp
+ delegates []federatedRequestDelegate
+}
+
+func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
+ req *http.Request,
+ clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
+
+ found := make(map[string]bool)
+ prev_len_uuids := len(uuids) + 1
+ // Loop while
+ // (1) there are more uuids to query
+ // (2) we're making progress - on each iteration the set of
+ // uuids we are expecting for must shrink.
+ for len(uuids) > 0 && len(uuids) < prev_len_uuids {
+ var remoteReq http.Request
+ remoteReq.Header = req.Header
+ remoteReq.Method = "POST"
+ remoteReq.URL = &url.URL{Path: req.URL.Path}
+ remoteParams := make(url.Values)
+ remoteParams.Set("_method", "GET")
+ remoteParams.Set("count", "none")
+ if req.Form.Get("select") != "" {
+ remoteParams.Set("select", req.Form.Get("select"))
+ }
+ content, err := json.Marshal(uuids)
+ if err != nil {
+ return nil, "", err
+ }
+ remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
+ enc := remoteParams.Encode()
+ remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
+
+ rc := multiClusterQueryResponseCollector{clusterID: clusterID}
+
+ var resp *http.Response
+ if clusterID == h.handler.Cluster.ClusterID {
+ resp, err = h.handler.localClusterRequest(&remoteReq)
+ } else {
+ resp, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
+ }
+ rc.collectResponse(resp, err)
+
+ if rc.error != nil {
+ return nil, "", rc.error
+ }
+
+ kind = rc.kind
+
+ if len(rc.responses) == 0 {
+ // We got zero responses, no point in doing
+ // another query.
+ return rp, kind, nil
+ }
+
+ rp = append(rp, rc.responses...)
+
+ // Go through the responses and determine what was
+ // returned. If there are remaining items, loop
+ // around and do another request with just the
+ // stragglers.
+ for _, i := range rc.responses {
+ uuid, ok := i["uuid"].(string)
+ if ok {
+ found[uuid] = true
+ }
+ }
+
+ l := []string{}
+ for _, u := range uuids {
+ if !found[u] {
+ l = append(l, u)
+ }
+ }
+ prev_len_uuids = len(uuids)
+ uuids = l
+ }
+
+ return rp, kind, nil
+}
+
+func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
+ req *http.Request, clusterId *string) bool {
+
+ var filters [][]interface{}
+ err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
+
+ // Split the list of uuids by prefix
+ queryClusters := make(map[string][]string)
+ expectCount := 0
+ for _, filter := range filters {
+ if len(filter) != 3 {
+ return false
+ }
+
+ if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
+ return false
+ }
+
+ op, ok := filter[1].(string)
+ if !ok {
+ return false
+ }
+
+ if op == "in" {
+ if rhs, ok := filter[2].([]interface{}); ok {
+ for _, i := range rhs {
+ if u, ok := i.(string); ok && len(u) == 27 {
+ *clusterId = u[0:5]
+ queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ expectCount += 1
+ }
+ }
+ }
+ } else if op == "=" {
+ if u, ok := filter[2].(string); ok && len(u) == 27 {
+ *clusterId = u[0:5]
+ queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ expectCount += 1
+ }
+ } else {
+ return false
+ }
+
+ }
+
+ if len(queryClusters) <= 1 {
+ // Query does not search for uuids across multiple
+ // clusters.
+ return false
+ }
+
+ // Validations
+ count := req.Form.Get("count")
+ if count != "" && count != `none` && count != `"none"` {
+ httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
+ return true
+ }
+ if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
+ httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
+ return true
+ }
+ if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
+ httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
+ expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
+ return true
+ }
+ if req.Form.Get("select") != "" {
+ foundUUID := false
+ var selects []string
+ err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
+
+ for _, r := range selects {
+ if r == "uuid" {
+ foundUUID = true
+ break
+ }
+ }
+ if !foundUUID {
+ httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
+ return true
+ }
+ }
+
+ // Perform concurrent requests to each cluster
+
+ // use channel as a semaphore to limit the number of concurrent
+ // requests at a time
+ sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
+ defer close(sem)
+ wg := sync.WaitGroup{}
+
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+ mtx := sync.Mutex{}
+ errors := []error{}
+ var completeResponses []map[string]interface{}
+ var kind string
+
+ for k, v := range queryClusters {
+ if len(v) == 0 {
+ // Nothing to query
+ continue
+ }
+
+ // blocks until it can put a value into the
+ // channel (which has a max queue capacity)
+ sem <- true
+ wg.Add(1)
+ go func(k string, v []string) {
+ rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
+ mtx.Lock()
+ if err == nil {
+ completeResponses = append(completeResponses, rp...)
+ kind = kn
+ } else {
+ errors = append(errors, err)
+ }
+ mtx.Unlock()
+ wg.Done()
+ <-sem
+ }(k, v)
+ }
+ wg.Wait()
+
+ if len(errors) > 0 {
+ var strerr []string
+ for _, e := range errors {
+ strerr = append(strerr, e.Error())
+ }
+ httpserver.Errors(w, strerr, http.StatusBadGateway)
+ return true
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ itemList := make(map[string]interface{})
+ itemList["items"] = completeResponses
+ itemList["kind"] = kind
+ json.NewEncoder(w).Encode(itemList)
+
+ return true
+}
+
+func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ m := h.matcher.FindStringSubmatch(req.URL.Path)
+ clusterId := ""
+
+ if len(m) > 0 && m[2] != "" {
+ clusterId = m[2]
+ }
+
+ // Get form parameters from URL and form body (if POST).
+ if err := loadParamsFromForm(req); err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ // Check if the parameters have an explicit cluster_id
+ if req.Form.Get("cluster_id") != "" {
+ clusterId = req.Form.Get("cluster_id")
+ }
+
+ // Handle the POST-as-GET special case (workaround for large
+ // GET requests that potentially exceed maximum URL length,
+ // like multi-object queries where the filter has 100s of
+ // items)
+ effectiveMethod := req.Method
+ if req.Method == "POST" && req.Form.Get("_method") != "" {
+ effectiveMethod = req.Form.Get("_method")
+ }
+
+ if effectiveMethod == "GET" &&
+ clusterId == "" &&
+ req.Form.Get("filters") != "" &&
+ h.handleMultiClusterQuery(w, req, &clusterId) {
+ return
+ }
+
+ var uuid string
+ if len(m[1]) > 0 {
+ // trim leading slash
+ uuid = m[1][1:]
+ }
+ for _, d := range h.delegates {
+ if d(h, effectiveMethod, &clusterId, uuid, m[3], w, req) {
+ return
+ }
+ }
+
+ if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
+ h.next.ServeHTTP(w, req)
+ } else {
+ resp, err := h.handler.remoteClusterRequest(clusterId, req)
+ h.handler.proxy.ForwardResponse(w, resp, err)
+ }
+}
+
+type multiClusterQueryResponseCollector struct {
+ responses []map[string]interface{}
+ error error
+ kind string
+ clusterID string
+}
+
+func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
+ requestError error) (newResponse *http.Response, err error) {
+ if requestError != nil {
+ c.error = requestError
+ return nil, nil
+ }
+
+ defer resp.Body.Close()
+ var loadInto struct {
+ Kind string `json:"kind"`
+ Items []map[string]interface{} `json:"items"`
+ Errors []string `json:"errors"`
+ }
+ err = json.NewDecoder(resp.Body).Decode(&loadInto)
+
+ if err != nil {
+ c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
+ return nil, nil
+ }
+ if resp.StatusCode != http.StatusOK {
+ c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
+ return nil, nil
+ }
+
+ c.responses = loadInto.Items
+ c.kind = loadInto.Kind
+
+ return nil, nil
+}
package controller
import (
- "bufio"
"bytes"
- "context"
- "crypto/md5"
"database/sql"
"encoding/json"
"fmt"
"io"
"io/ioutil"
+ "mime"
"net/http"
"net/url"
"regexp"
"strings"
- "sync"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/jmcvetta/randutil"
)
var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-[0-9a-z]{15})?(.*)$`
var wfRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "workflows", "7fd4e"))
var containersRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "containers", "dz642"))
var containerRequestsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "container_requests", "xvhdp"))
-var collectionRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "collections", "4zz18"))
-var collectionByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
+var collectionsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "collections", "4zz18"))
+var collectionsByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
+var linksRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "links", "o0j2j"))
-type genericFederatedRequestHandler struct {
- next http.Handler
- handler *Handler
- matcher *regexp.Regexp
-}
-
-type collectionFederatedRequestHandler struct {
- next http.Handler
- handler *Handler
-}
-
-func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, req *http.Request, filter ResponseFilter) {
+func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*http.Response, error) {
remote, ok := h.Cluster.RemoteClusters[remoteID]
if !ok {
- httpserver.Error(w, "no proxy available for cluster "+remoteID, http.StatusNotFound)
- return
+ return nil, HTTPError{fmt.Sprintf("no proxy available for cluster %v", remoteID), http.StatusNotFound}
}
scheme := remote.Scheme
if scheme == "" {
scheme = "https"
}
- err := h.saltAuthToken(req, remoteID)
+ saltedReq, err := h.saltAuthToken(req, remoteID)
if err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- return
+ return nil, err
}
urlOut := &url.URL{
Scheme: scheme,
Host: remote.Host,
- Path: req.URL.Path,
- RawPath: req.URL.RawPath,
- RawQuery: req.URL.RawQuery,
+ Path: saltedReq.URL.Path,
+ RawPath: saltedReq.URL.RawPath,
+ RawQuery: saltedReq.URL.RawQuery,
}
client := h.secureClient
if remote.Insecure {
client = h.insecureClient
}
- h.proxy.Do(w, req, urlOut, client, filter)
+ return h.proxy.Do(saltedReq, urlOut, client)
}
// Buffer request body, parse form parameters in request, and then
// downstream proxy steps.
func loadParamsFromForm(req *http.Request) error {
var postBody *bytes.Buffer
- if req.Body != nil && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
+ if ct := req.Header.Get("Content-Type"); ct == "" {
+ // Assume application/octet-stream, i.e., no form to parse.
+ } else if ct, _, err := mime.ParseMediaType(ct); err != nil {
+ return err
+ } else if ct == "application/x-www-form-urlencoded" && req.Body != nil {
var cl int64
if req.ContentLength > 0 {
cl = req.ContentLength
return nil
}
-type multiClusterQueryResponseCollector struct {
- responses []map[string]interface{}
- error error
- kind string
- clusterID string
-}
-
-func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
- requestError error) (newResponse *http.Response, err error) {
- if requestError != nil {
- c.error = requestError
- return nil, nil
- }
-
- defer resp.Body.Close()
- var loadInto struct {
- Kind string `json:"kind"`
- Items []map[string]interface{} `json:"items"`
- Errors []string `json:"errors"`
- }
- err = json.NewDecoder(resp.Body).Decode(&loadInto)
-
- if err != nil {
- c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
- return nil, nil
- }
- if resp.StatusCode != http.StatusOK {
- c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
- return nil, nil
- }
-
- c.responses = loadInto.Items
- c.kind = loadInto.Kind
-
- return nil, nil
-}
-
-func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
- req *http.Request,
- clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
-
- found := make(map[string]bool)
- prev_len_uuids := len(uuids) + 1
- // Loop while
- // (1) there are more uuids to query
- // (2) we're making progress - on each iteration the set of
- // uuids we are expecting for must shrink.
- for len(uuids) > 0 && len(uuids) < prev_len_uuids {
- var remoteReq http.Request
- remoteReq.Header = req.Header
- remoteReq.Method = "POST"
- remoteReq.URL = &url.URL{Path: req.URL.Path}
- remoteParams := make(url.Values)
- remoteParams.Set("_method", "GET")
- remoteParams.Set("count", "none")
- if req.Form.Get("select") != "" {
- remoteParams.Set("select", req.Form.Get("select"))
- }
- content, err := json.Marshal(uuids)
- if err != nil {
- return nil, "", err
- }
- remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
- enc := remoteParams.Encode()
- remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
-
- rc := multiClusterQueryResponseCollector{clusterID: clusterID}
-
- if clusterID == h.handler.Cluster.ClusterID {
- h.handler.localClusterRequest(w, &remoteReq,
- rc.collectResponse)
- } else {
- h.handler.remoteClusterRequest(clusterID, w, &remoteReq,
- rc.collectResponse)
- }
- if rc.error != nil {
- return nil, "", rc.error
- }
-
- kind = rc.kind
-
- if len(rc.responses) == 0 {
- // We got zero responses, no point in doing
- // another query.
- return rp, kind, nil
- }
-
- rp = append(rp, rc.responses...)
-
- // Go through the responses and determine what was
- // returned. If there are remaining items, loop
- // around and do another request with just the
- // stragglers.
- for _, i := range rc.responses {
- uuid, ok := i["uuid"].(string)
- if ok {
- found[uuid] = true
- }
- }
-
- l := []string{}
- for _, u := range uuids {
- if !found[u] {
- l = append(l, u)
- }
- }
- prev_len_uuids = len(uuids)
- uuids = l
- }
-
- return rp, kind, nil
-}
-
-func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
- req *http.Request, clusterId *string) bool {
-
- var filters [][]interface{}
- err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- return true
- }
-
- // Split the list of uuids by prefix
- queryClusters := make(map[string][]string)
- expectCount := 0
- for _, filter := range filters {
- if len(filter) != 3 {
- return false
- }
-
- if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
- return false
- }
-
- op, ok := filter[1].(string)
- if !ok {
- return false
- }
-
- if op == "in" {
- if rhs, ok := filter[2].([]interface{}); ok {
- for _, i := range rhs {
- if u, ok := i.(string); ok {
- *clusterId = u[0:5]
- queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
- expectCount += 1
- }
- }
- }
- } else if op == "=" {
- if u, ok := filter[2].(string); ok {
- *clusterId = u[0:5]
- queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
- expectCount += 1
- }
- } else {
- return false
- }
-
- }
-
- if len(queryClusters) <= 1 {
- // Query does not search for uuids across multiple
- // clusters.
- return false
- }
-
- // Validations
- count := req.Form.Get("count")
- if count != "" && count != `none` && count != `"none"` {
- httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
- return true
- }
- if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
- httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
- return true
- }
- if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
- httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
- expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
- return true
- }
- if req.Form.Get("select") != "" {
- foundUUID := false
- var selects []string
- err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- return true
- }
-
- for _, r := range selects {
- if r == "uuid" {
- foundUUID = true
- break
- }
- }
- if !foundUUID {
- httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
- return true
- }
- }
-
- // Perform concurrent requests to each cluster
-
- // use channel as a semaphore to limit the number of concurrent
- // requests at a time
- sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
- defer close(sem)
- wg := sync.WaitGroup{}
-
- req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
- mtx := sync.Mutex{}
- errors := []error{}
- var completeResponses []map[string]interface{}
- var kind string
-
- for k, v := range queryClusters {
- if len(v) == 0 {
- // Nothing to query
- continue
- }
-
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
- wg.Add(1)
- go func(k string, v []string) {
- rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
- mtx.Lock()
- if err == nil {
- completeResponses = append(completeResponses, rp...)
- kind = kn
- } else {
- errors = append(errors, err)
- }
- mtx.Unlock()
- wg.Done()
- <-sem
- }(k, v)
- }
- wg.Wait()
-
- if len(errors) > 0 {
- var strerr []string
- for _, e := range errors {
- strerr = append(strerr, e.Error())
- }
- httpserver.Errors(w, strerr, http.StatusBadGateway)
- return true
- }
-
- w.Header().Set("Content-Type", "application/json")
- w.WriteHeader(http.StatusOK)
- itemList := make(map[string]interface{})
- itemList["items"] = completeResponses
- itemList["kind"] = kind
- json.NewEncoder(w).Encode(itemList)
-
- return true
-}
-
-func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- m := h.matcher.FindStringSubmatch(req.URL.Path)
- clusterId := ""
-
- if len(m) > 0 && m[2] != "" {
- clusterId = m[2]
- }
-
- // Get form parameters from URL and form body (if POST).
- if err := loadParamsFromForm(req); err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
-
- // Check if the parameters have an explicit cluster_id
- if req.Form.Get("cluster_id") != "" {
- clusterId = req.Form.Get("cluster_id")
- }
-
- // Handle the POST-as-GET special case (workaround for large
- // GET requests that potentially exceed maximum URL length,
- // like multi-object queries where the filter has 100s of
- // items)
- effectiveMethod := req.Method
- if req.Method == "POST" && req.Form.Get("_method") != "" {
- effectiveMethod = req.Form.Get("_method")
- }
-
- if effectiveMethod == "GET" &&
- clusterId == "" &&
- req.Form.Get("filters") != "" &&
- h.handleMultiClusterQuery(w, req, &clusterId) {
- return
- }
-
- if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
- h.next.ServeHTTP(w, req)
- } else {
- h.handler.remoteClusterRequest(clusterId, w, req, nil)
- }
-}
-
-type rewriteSignaturesClusterId struct {
- clusterID string
- expectHash string
-}
-
-func (rw rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
- if requestError != nil {
- return resp, requestError
- }
-
- if resp.StatusCode != 200 {
- return resp, nil
- }
-
- originalBody := resp.Body
- defer originalBody.Close()
-
- var col arvados.Collection
- err = json.NewDecoder(resp.Body).Decode(&col)
- if err != nil {
- return nil, err
- }
-
- // rewriting signatures will make manifest text 5-10% bigger so calculate
- // capacity accordingly
- updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
-
- hasher := md5.New()
- mw := io.MultiWriter(hasher, updatedManifest)
- sz := 0
-
- scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
- scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
- for scanner.Scan() {
- line := scanner.Text()
- tokens := strings.Split(line, " ")
- if len(tokens) < 3 {
- return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
- }
-
- n, err := mw.Write([]byte(tokens[0]))
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
- sz += n
- for _, token := range tokens[1:] {
- n, err = mw.Write([]byte(" "))
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
- sz += n
-
- m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
- if m != nil {
- // Rewrite the block signature to be a remote signature
- _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], rw.clusterID, m[5][2:], m[8])
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
-
- // for hash checking, ignore signatures
- n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
- sz += n
- } else {
- n, err = mw.Write([]byte(token))
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
- sz += n
- }
- }
- n, err = mw.Write([]byte("\n"))
- if err != nil {
- return nil, fmt.Errorf("Error updating manifest: %v", err)
- }
- sz += n
- }
-
- // Check that expected hash is consistent with
- // portable_data_hash field of the returned record
- if rw.expectHash == "" {
- rw.expectHash = col.PortableDataHash
- } else if rw.expectHash != col.PortableDataHash {
- return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", rw.expectHash, col.PortableDataHash)
- }
-
- // Certify that the computed hash of the manifest_text matches our expectation
- sum := hasher.Sum(nil)
- computedHash := fmt.Sprintf("%x+%v", sum, sz)
- if computedHash != rw.expectHash {
- return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, rw.expectHash)
- }
-
- col.ManifestText = updatedManifest.String()
-
- newbody, err := json.Marshal(col)
- if err != nil {
- return nil, err
- }
-
- buf := bytes.NewBuffer(newbody)
- resp.Body = ioutil.NopCloser(buf)
- resp.ContentLength = int64(buf.Len())
- resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
-
- return resp, nil
-}
-
-func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
- if requestError != nil {
- return resp, requestError
- }
-
- if resp.StatusCode == 404 {
- // Suppress returning this result, because we want to
- // search the federation.
- return nil, nil
- }
- return resp, nil
-}
-
-type searchRemoteClusterForPDH struct {
- pdh string
- remoteID string
- mtx *sync.Mutex
- sentResponse *bool
- sharedContext *context.Context
- cancelFunc func()
- errors *[]string
- statusCode *int
-}
-
-func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
- s.mtx.Lock()
- defer s.mtx.Unlock()
-
- if *s.sentResponse {
- // Another request already returned a response
- return nil, nil
- }
-
- if requestError != nil {
- *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
- // Record the error and suppress response
- return nil, nil
- }
-
- if resp.StatusCode != 200 {
- // Suppress returning unsuccessful result. Maybe
- // another request will find it.
- // TODO collect and return error responses.
- *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
- if resp.StatusCode != 404 {
- // Got a non-404 error response, convert into BadGateway
- *s.statusCode = http.StatusBadGateway
- }
- return nil, nil
- }
-
- s.mtx.Unlock()
-
- // This reads the response body. We don't want to hold the
- // lock while doing this because other remote requests could
- // also have made it to this point, and we don't want a
- // slow response holding the lock to block a faster response
- // that is waiting on the lock.
- newResponse, err = rewriteSignaturesClusterId{s.remoteID, s.pdh}.rewriteSignatures(resp, nil)
-
- s.mtx.Lock()
-
- if *s.sentResponse {
- // Another request already returned a response
- return nil, nil
- }
-
- if err != nil {
- // Suppress returning unsuccessful result. Maybe
- // another request will be successful.
- *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
- return nil, nil
- }
-
- // We have a successful response. Suppress/cancel all the
- // other requests/responses.
- *s.sentResponse = true
- s.cancelFunc()
-
- return newResponse, nil
-}
-
-func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- if req.Method != "GET" {
- // Only handle GET requests right now
- h.next.ServeHTTP(w, req)
- return
- }
-
- m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
- if len(m) != 2 {
- // Not a collection PDH GET request
- m = collectionRe.FindStringSubmatch(req.URL.Path)
- clusterId := ""
-
- if len(m) > 0 {
- clusterId = m[2]
- }
-
- if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
- // request for remote collection by uuid
- h.handler.remoteClusterRequest(clusterId, w, req,
- rewriteSignaturesClusterId{clusterId, ""}.rewriteSignatures)
- return
- }
- // not a collection UUID request, or it is a request
- // for a local UUID, either way, continue down the
- // handler stack.
- h.next.ServeHTTP(w, req)
- return
- }
-
- // Request for collection by PDH. Search the federation.
-
- // First, query the local cluster.
- if h.handler.localClusterRequest(w, req, filterLocalClusterResponse) {
- return
- }
-
- sharedContext, cancelFunc := context.WithCancel(req.Context())
- defer cancelFunc()
- req = req.WithContext(sharedContext)
-
- // Create a goroutine for each cluster in the
- // RemoteClusters map. The first valid result gets
- // returned to the client. When that happens, all
- // other outstanding requests are cancelled or
- // suppressed.
- sentResponse := false
- mtx := sync.Mutex{}
- wg := sync.WaitGroup{}
- var errors []string
- var errorCode int = 404
-
- // use channel as a semaphore to limit the number of concurrent
- // requests at a time
- sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
- defer close(sem)
- for remoteID := range h.handler.Cluster.RemoteClusters {
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
- if sentResponse {
- break
- }
- search := &searchRemoteClusterForPDH{m[1], remoteID, &mtx, &sentResponse,
- &sharedContext, cancelFunc, &errors, &errorCode}
- wg.Add(1)
- go func() {
- h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
- wg.Done()
- <-sem
- }()
- }
- wg.Wait()
-
- if sentResponse {
- return
- }
-
- // No successful responses, so return the error
- httpserver.Errors(w, errors, errorCode)
-}
-
func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
mux := http.NewServeMux()
- mux.Handle("/arvados/v1/workflows", &genericFederatedRequestHandler{next, h, wfRe})
- mux.Handle("/arvados/v1/workflows/", &genericFederatedRequestHandler{next, h, wfRe})
- mux.Handle("/arvados/v1/containers", &genericFederatedRequestHandler{next, h, containersRe})
- mux.Handle("/arvados/v1/containers/", &genericFederatedRequestHandler{next, h, containersRe})
- mux.Handle("/arvados/v1/container_requests", &genericFederatedRequestHandler{next, h, containerRequestsRe})
- mux.Handle("/arvados/v1/container_requests/", &genericFederatedRequestHandler{next, h, containerRequestsRe})
- mux.Handle("/arvados/v1/collections", next)
- mux.Handle("/arvados/v1/collections/", &collectionFederatedRequestHandler{next, h})
+
+ wfHandler := &genericFederatedRequestHandler{next, h, wfRe, nil}
+ containersHandler := &genericFederatedRequestHandler{next, h, containersRe, nil}
+ containerRequestsHandler := &genericFederatedRequestHandler{next, h, containerRequestsRe,
+ []federatedRequestDelegate{remoteContainerRequestCreate}}
+ collectionsRequestsHandler := &genericFederatedRequestHandler{next, h, collectionsRe,
+ []federatedRequestDelegate{fetchRemoteCollectionByUUID, fetchRemoteCollectionByPDH}}
+ linksRequestsHandler := &genericFederatedRequestHandler{next, h, linksRe, nil}
+
+ mux.Handle("/arvados/v1/workflows", wfHandler)
+ mux.Handle("/arvados/v1/workflows/", wfHandler)
+ mux.Handle("/arvados/v1/containers", containersHandler)
+ mux.Handle("/arvados/v1/containers/", containersHandler)
+ mux.Handle("/arvados/v1/container_requests", containerRequestsHandler)
+ mux.Handle("/arvados/v1/container_requests/", containerRequestsHandler)
+ mux.Handle("/arvados/v1/collections", collectionsRequestsHandler)
+ mux.Handle("/arvados/v1/collections/", collectionsRequestsHandler)
+ mux.Handle("/arvados/v1/links", linksRequestsHandler)
+ mux.Handle("/arvados/v1/links/", linksRequestsHandler)
mux.Handle("/", next)
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
UUID string
}
-func (h *Handler) validateAPItoken(req *http.Request, user *CurrentUser) error {
+// validateAPItoken extracts the token from the provided http request,
+// checks it again api_client_authorizations table in the database,
+// and fills in the token scope and user UUID. Does not handle remote
+// tokens unless they are already in the database and not expired.
+func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, error) {
+ user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
db, err := h.db(req)
if err != nil {
- return err
+ return nil, err
+ }
+
+ var uuid string
+ if strings.HasPrefix(token, "v2/") {
+ sp := strings.Split(token, "/")
+ uuid = sp[1]
+ token = sp[2]
+ }
+ user.Authorization.APIToken = token
+ var scopes string
+ err = db.QueryRowContext(req.Context(), `SELECT api_client_authorizations.uuid, api_client_authorizations.scopes, users.uuid FROM api_client_authorizations JOIN users on api_client_authorizations.user_id=users.id WHERE api_token=$1 AND (expires_at IS NULL OR expires_at > current_timestamp) LIMIT 1`, token).Scan(&user.Authorization.UUID, &scopes, &user.UUID)
+ if err != nil {
+ return nil, err
+ }
+ if uuid != "" && user.Authorization.UUID != uuid {
+ return nil, fmt.Errorf("UUID embedded in v2 token did not match record")
+ }
+ err = json.Unmarshal([]byte(scopes), &user.Authorization.Scopes)
+ if err != nil {
+ return nil, err
+ }
+ return &user, nil
+}
+
+func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
+ db, err := h.db(req)
+ if err != nil {
+ return nil, err
+ }
+ rd, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
+ if err != nil {
+ return nil, err
+ }
+ uuid := fmt.Sprintf("%v-gj3su-%v", h.Cluster.ClusterID, rd)
+ token, err := randutil.String(50, "abcdefghijklmnopqrstuvwxyz0123456789")
+ if err != nil {
+ return nil, err
+ }
+ if len(scopes) == 0 {
+ scopes = append(scopes, "all")
+ }
+ scopesjson, err := json.Marshal(scopes)
+ if err != nil {
+ return nil, err
+ }
+ _, err = db.ExecContext(req.Context(),
+ `INSERT INTO api_client_authorizations
+(uuid, api_token, expires_at, scopes,
+user_id,
+api_client_id, created_at, updated_at)
+VALUES ($1, $2, CURRENT_TIMESTAMP + INTERVAL '2 weeks', $3,
+(SELECT id FROM users WHERE users.uuid=$4 LIMIT 1),
+0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`,
+ uuid, token, string(scopesjson), userUUID)
+
+ if err != nil {
+ return nil, err
}
- return db.QueryRowContext(req.Context(), `SELECT api_client_authorizations.uuid, users.uuid FROM api_client_authorizations JOIN users on api_client_authorizations.user_id=users.id WHERE api_token=$1 AND (expires_at IS NULL OR expires_at > current_timestamp) LIMIT 1`, user.Authorization.APIToken).Scan(&user.Authorization.UUID, &user.UUID)
+
+ return &arvados.APIClientAuthorization{
+ UUID: uuid,
+ APIToken: token,
+ ExpiresAt: "",
+ Scopes: scopes}, nil
}
// Extract the auth token supplied in req, and replace it with a
// salted token for the remote cluster.
-func (h *Handler) saltAuthToken(req *http.Request, remote string) error {
+func (h *Handler) saltAuthToken(req *http.Request, remote string) (updatedReq *http.Request, err error) {
+ updatedReq = (&http.Request{
+ Method: req.Method,
+ URL: req.URL,
+ Header: req.Header,
+ Body: req.Body,
+ ContentLength: req.ContentLength,
+ Host: req.Host,
+ }).WithContext(req.Context())
+
creds := auth.NewCredentials()
- creds.LoadTokensFromHTTPRequest(req)
- if len(creds.Tokens) == 0 && req.Header.Get("Content-Type") == "application/x-www-form-encoded" {
+ creds.LoadTokensFromHTTPRequest(updatedReq)
+ if len(creds.Tokens) == 0 && updatedReq.Header.Get("Content-Type") == "application/x-www-form-encoded" {
// Override ParseForm's 10MiB limit by ensuring
// req.Body is a *http.maxBytesReader.
- req.Body = http.MaxBytesReader(nil, req.Body, 1<<28) // 256MiB. TODO: use MaxRequestSize from discovery doc or config.
- if err := creds.LoadTokensFromHTTPRequestBody(req); err != nil {
- return err
+ updatedReq.Body = http.MaxBytesReader(nil, updatedReq.Body, 1<<28) // 256MiB. TODO: use MaxRequestSize from discovery doc or config.
+ if err := creds.LoadTokensFromHTTPRequestBody(updatedReq); err != nil {
+ return nil, err
}
// Replace req.Body with a buffer that re-encodes the
// form without api_token, in case we end up
// forwarding the request.
- if req.PostForm != nil {
- req.PostForm.Del("api_token")
+ if updatedReq.PostForm != nil {
+ updatedReq.PostForm.Del("api_token")
}
- req.Body = ioutil.NopCloser(bytes.NewBufferString(req.PostForm.Encode()))
+ updatedReq.Body = ioutil.NopCloser(bytes.NewBufferString(updatedReq.PostForm.Encode()))
}
if len(creds.Tokens) == 0 {
- return nil
+ return updatedReq, nil
}
+
token, err := auth.SaltToken(creds.Tokens[0], remote)
+
if err == auth.ErrObsoleteToken {
// If the token exists in our own database, salt it
// for the remote. Otherwise, assume it was issued by
// the remote, and pass it through unmodified.
- currentUser := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: creds.Tokens[0]}}
- err = h.validateAPItoken(req, ¤tUser)
+ currentUser, err := h.validateAPItoken(req, creds.Tokens[0])
if err == sql.ErrNoRows {
// Not ours; pass through unmodified.
- token = currentUser.Authorization.APIToken
+ token = creds.Tokens[0]
} else if err != nil {
- return err
+ return nil, err
} else {
// Found; make V2 version and salt it.
token, err = auth.SaltToken(currentUser.Authorization.TokenV2(), remote)
if err != nil {
- return err
+ return nil, err
}
}
} else if err != nil {
- return err
+ return nil, err
+ }
+ updatedReq.Header = http.Header{}
+ for k, v := range req.Header {
+ if k != "Authorization" {
+ updatedReq.Header[k] = v
+ }
}
- req.Header.Set("Authorization", "Bearer "+token)
+ updatedReq.Header.Set("Authorization", "Bearer "+token)
// Remove api_token=... from the the query string, in case we
// end up forwarding the request.
- if values, err := url.ParseQuery(req.URL.RawQuery); err != nil {
- return err
+ if values, err := url.ParseQuery(updatedReq.URL.RawQuery); err != nil {
+ return nil, err
} else if _, ok := values["api_token"]; ok {
delete(values, "api_token")
- req.URL.RawQuery = values.Encode()
+ updatedReq.URL = &url.URL{
+ Scheme: req.URL.Scheme,
+ Host: req.URL.Host,
+ Path: req.URL.Path,
+ RawPath: req.URL.RawPath,
+ RawQuery: values.Encode(),
+ }
}
- return nil
+ return updatedReq, nil
}
package controller
import (
+ "bytes"
"encoding/json"
"fmt"
+ "io"
"io/ioutil"
"net/http"
"net/http/httptest"
}
func (s *FederationSuite) remoteMockHandler(w http.ResponseWriter, req *http.Request) {
+ b := &bytes.Buffer{}
+ io.Copy(b, req.Body)
+ req.Body.Close()
+ req.Body = ioutil.NopCloser(b)
s.remoteMockRequests = append(s.remoteMockRequests, *req)
}
s.testHandler.Cluster.NodeProfiles["*"] = np
s.testHandler.NodeProfile = &np
+ // HTTP GET
+
req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementCollection, nil)
req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
resp := s.testRequest(req)
c.Check(col.ManifestText, check.Matches,
`\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
`)
+
+ // HTTP POST with _method=GET as a form parameter
+
+ req = httptest.NewRequest("POST", "/arvados/v1/collections/"+arvadostest.UserAgreementCollection, bytes.NewBufferString((url.Values{
+ "_method": {"GET"},
+ }).Encode()))
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
+ resp = s.testRequest(req)
+
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ col = arvados.Collection{}
+ c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+ c.Check(col.UUID, check.Equals, arvadostest.UserAgreementCollection)
+ c.Check(col.ManifestText, check.Matches,
+ `\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
}
func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
c.Check(strings.HasPrefix(cr.UUID, "zzzzz-"), check.Equals, true)
}
+func (s *FederationSuite) TestCreateRemoteContainerRequestCheckRuntimeToken(c *check.C) {
+ // Send request to zmock and check that outgoing request has
+ // runtime_token set with a new random v2 token.
+
+ defer s.localServiceReturns404(c).Close()
+ // pass cluster_id via query parameter, this allows arvados-controller
+ // to avoid parsing the body
+ req := httptest.NewRequest("POST", "/arvados/v1/container_requests?cluster_id=zmock",
+ strings.NewReader(`{
+ "container_request": {
+ "name": "hello world",
+ "state": "Uncommitted",
+ "output_path": "/",
+ "container_image": "123",
+ "command": ["abc"]
+ }
+}
+`))
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2)
+ req.Header.Set("Content-type", "application/json")
+
+ np := arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+ TLS: true, Insecure: true}}
+ s.testHandler.Cluster.ClusterID = "zzzzz"
+ s.testHandler.Cluster.NodeProfiles["*"] = np
+ s.testHandler.NodeProfile = &np
+
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var cr struct {
+ arvados.ContainerRequest `json:"container_request"`
+ }
+ c.Check(json.NewDecoder(s.remoteMockRequests[0].Body).Decode(&cr), check.IsNil)
+ c.Check(strings.HasPrefix(cr.ContainerRequest.RuntimeToken, "v2/zzzzz-gj3su-"), check.Equals, true)
+ c.Check(cr.ContainerRequest.RuntimeToken, check.Not(check.Equals), arvadostest.ActiveTokenV2)
+}
+
+func (s *FederationSuite) TestCreateRemoteContainerRequestCheckSetRuntimeToken(c *check.C) {
+ // Send request to zmock and check that outgoing request has
+ // runtime_token set with the explicitly provided token.
+
+ defer s.localServiceReturns404(c).Close()
+ // pass cluster_id via query parameter, this allows arvados-controller
+ // to avoid parsing the body
+ req := httptest.NewRequest("POST", "/arvados/v1/container_requests?cluster_id=zmock",
+ strings.NewReader(`{
+ "container_request": {
+ "name": "hello world",
+ "state": "Uncommitted",
+ "output_path": "/",
+ "container_image": "123",
+ "command": ["abc"],
+ "runtime_token": "xyz"
+ }
+}
+`))
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ req.Header.Set("Content-type", "application/json")
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var cr struct {
+ arvados.ContainerRequest `json:"container_request"`
+ }
+ c.Check(json.NewDecoder(s.remoteMockRequests[0].Body).Decode(&cr), check.IsNil)
+ c.Check(cr.ContainerRequest.RuntimeToken, check.Equals, "xyz")
+}
+
+func (s *FederationSuite) TestCreateRemoteContainerRequestRuntimeTokenFromAuth(c *check.C) {
+ // Send request to zmock and check that outgoing request has
+ // runtime_token set using the Auth token because the user is remote.
+
+ defer s.localServiceReturns404(c).Close()
+ // pass cluster_id via query parameter, this allows arvados-controller
+ // to avoid parsing the body
+ req := httptest.NewRequest("POST", "/arvados/v1/container_requests?cluster_id=zmock",
+ strings.NewReader(`{
+ "container_request": {
+ "name": "hello world",
+ "state": "Uncommitted",
+ "output_path": "/",
+ "container_image": "123",
+ "command": ["abc"]
+ }
+}
+`))
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2+"/zzzzz-dz642-parentcontainer")
+ req.Header.Set("Content-type", "application/json")
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var cr struct {
+ arvados.ContainerRequest `json:"container_request"`
+ }
+ c.Check(json.NewDecoder(s.remoteMockRequests[0].Body).Decode(&cr), check.IsNil)
+ c.Check(cr.ContainerRequest.RuntimeToken, check.Equals, arvadostest.ActiveTokenV2)
+}
+
func (s *FederationSuite) TestCreateRemoteContainerRequestError(c *check.C) {
defer s.localServiceReturns404(c).Close()
// pass cluster_id via query parameter, this allows arvados-controller
package controller
import (
+ "context"
"database/sql"
"errors"
"net"
req.URL.Path = strings.Replace(req.URL.Path, "//", "/", -1)
}
}
+ if h.Cluster.HTTPRequestTimeout > 0 {
+ ctx, cancel := context.WithDeadline(req.Context(), time.Now().Add(time.Duration(h.Cluster.HTTPRequestTimeout)))
+ req = req.WithContext(ctx)
+ defer cancel()
+ }
+
h.handlerStack.ServeHTTP(w, req)
}
h.insecureClient = &ic
h.proxy = &proxy{
- Name: "arvados-controller",
- RequestTimeout: time.Duration(h.Cluster.HTTPRequestTimeout),
+ Name: "arvados-controller",
}
}
})
}
-// localClusterRequest sets up a request so it can be proxied to the
-// local API server using proxy.Do(). Returns true if a response was
-// written, false if not.
-func (h *Handler) localClusterRequest(w http.ResponseWriter, req *http.Request, filter ResponseFilter) bool {
+func (h *Handler) localClusterRequest(req *http.Request) (*http.Response, error) {
urlOut, insecure, err := findRailsAPI(h.Cluster, h.NodeProfile)
if err != nil {
- httpserver.Error(w, err.Error(), http.StatusInternalServerError)
- return true
+ return nil, err
}
urlOut = &url.URL{
Scheme: urlOut.Scheme,
if insecure {
client = h.insecureClient
}
- return h.proxy.Do(w, req, urlOut, client, filter)
+ return h.proxy.Do(req, urlOut, client)
}
func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
- if !h.localClusterRequest(w, req, nil) && next != nil {
- next.ServeHTTP(w, req)
+ resp, err := h.localClusterRequest(req)
+ n, err := h.proxy.ForwardResponse(w, resp, err)
+ if err != nil {
+ httpserver.Logger(req).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
}
}
c.Check(resp.Code, check.Equals, http.StatusFound)
c.Check(resp.Header().Get("Location"), check.Matches, `https://0.0.0.0:1/auth/joshid\?return_to=foo&?`)
}
+
+func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
+ user, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveToken)
+ c.Assert(err, check.IsNil)
+ c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
+ c.Check(user.Authorization.APIToken, check.Equals, arvadostest.ActiveToken)
+ c.Check(user.Authorization.Scopes, check.DeepEquals, []string{"all"})
+ c.Check(user.UUID, check.Equals, arvadostest.ActiveUserUUID)
+}
+
+func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
+ user, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveTokenV2)
+ c.Assert(err, check.IsNil)
+ c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
+ c.Check(user.Authorization.APIToken, check.Equals, arvadostest.ActiveToken)
+ c.Check(user.Authorization.Scopes, check.DeepEquals, []string{"all"})
+ c.Check(user.UUID, check.Equals, arvadostest.ActiveUserUUID)
+ c.Check(user.Authorization.TokenV2(), check.Equals, arvadostest.ActiveTokenV2)
+}
+
+func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
+ auth, err := s.handler.(*Handler).createAPItoken(req, arvadostest.ActiveUserUUID, nil)
+ c.Assert(err, check.IsNil)
+ c.Check(auth.Scopes, check.DeepEquals, []string{"all"})
+
+ user, err := s.handler.(*Handler).validateAPItoken(req, auth.TokenV2())
+ c.Assert(err, check.IsNil)
+ c.Check(user.Authorization.UUID, check.Equals, auth.UUID)
+ c.Check(user.Authorization.APIToken, check.Equals, auth.APIToken)
+ c.Check(user.Authorization.Scopes, check.DeepEquals, []string{"all"})
+ c.Check(user.UUID, check.Equals, arvadostest.ActiveUserUUID)
+ c.Check(user.Authorization.TokenV2(), check.Equals, auth.TokenV2())
+}
package controller
import (
- "context"
"io"
"net/http"
"net/url"
- "time"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
)
type proxy struct {
- Name string // to use in Via header
- RequestTimeout time.Duration
+ Name string // to use in Via header
+}
+
+type HTTPError struct {
+ Message string
+ Code int
+}
+
+func (h HTTPError) Error() string {
+ return h.Message
}
// headers that shouldn't be forwarded when proxying. See
type ResponseFilter func(*http.Response, error) (*http.Response, error)
-// Do sends a request, passes the result to the filter (if provided)
-// and then if the result is not suppressed by the filter, sends the
-// request to the ResponseWriter. Returns true if a response was written,
-// false if not.
-func (p *proxy) Do(w http.ResponseWriter,
+// Forward a request to upstream service, and return response or error.
+func (p *proxy) Do(
reqIn *http.Request,
urlOut *url.URL,
- client *http.Client,
- filter ResponseFilter) bool {
+ client *http.Client) (*http.Response, error) {
// Copy headers from incoming request, then add/replace proxy
// headers like Via and X-Forwarded-For.
}
hdrOut.Add("Via", reqIn.Proto+" arvados-controller")
- ctx := reqIn.Context()
- if p.RequestTimeout > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(p.RequestTimeout)))
- defer cancel()
- }
-
reqOut := (&http.Request{
Method: reqIn.Method,
URL: urlOut,
Host: reqIn.Host,
Header: hdrOut,
Body: reqIn.Body,
- }).WithContext(ctx)
+ }).WithContext(reqIn.Context())
resp, err := client.Do(reqOut)
- if filter == nil && err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadGateway)
- return true
- }
-
- // make sure original response body gets closed
- var originalBody io.ReadCloser
- if resp != nil {
- originalBody = resp.Body
- if originalBody != nil {
- defer originalBody.Close()
- }
- }
-
- if filter != nil {
- resp, err = filter(resp, err)
+ return resp, err
+}
- if err != nil {
+// Copy a response (or error) to the downstream client
+func (p *proxy) ForwardResponse(w http.ResponseWriter, resp *http.Response, err error) (int64, error) {
+ if err != nil {
+ if he, ok := err.(HTTPError); ok {
+ httpserver.Error(w, he.Message, he.Code)
+ } else {
httpserver.Error(w, err.Error(), http.StatusBadGateway)
- return true
- }
- if resp == nil {
- // filter() returned a nil response, this means suppress
- // writing a response, for the case where there might
- // be multiple response writers.
- return false
- }
-
- // the filter gave us a new response body, make sure that gets closed too.
- if resp.Body != originalBody {
- defer resp.Body.Close()
}
+ return 0, nil
}
+ defer resp.Body.Close()
for k, v := range resp.Header {
for _, v := range v {
w.Header().Add(k, v)
}
}
w.WriteHeader(resp.StatusCode)
- n, err := io.Copy(w, resp.Body)
- if err != nil {
- httpserver.Logger(reqIn).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
- }
- return true
+ return io.Copy(w, resp.Body)
}
// APIClientAuthorization is an arvados#apiClientAuthorization resource.
type APIClientAuthorization struct {
- UUID string `json:"uuid"`
- APIToken string `json:"api_token"`
+ UUID string `json:"uuid,omitempty"`
+ APIToken string `json:"api_token,omitempty"`
+ ExpiresAt string `json:"expires_at,omitempty"`
+ Scopes []string `json:"scopes,omitempty"`
}
// APIClientAuthorizationList is an arvados#apiClientAuthorizationList resource.
UseExisting bool `json:"use_existing"`
LogUUID string `json:"log_uuid"`
OutputUUID string `json:"output_uuid"`
+ RuntimeToken string `json:"runtime_token"`
}
// Mount is special behavior to attach to a filesystem path or device.
type keepClient interface {
ReadAt(locator string, p []byte, off int) (int, error)
PutB(p []byte) (string, int, error)
+ LocalLocator(locator string) (string, error)
}
type apiClient interface {
"encoding/json"
"fmt"
"io"
- "log"
"os"
"path"
"regexp"
}
func (fs *collectionFileSystem) Sync() error {
- log.Printf("cfs.Sync()")
if fs.uuid == "" {
return nil
}
txt, err := fs.MarshalManifest(".")
if err != nil {
- log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
- return err
+ return fmt.Errorf("sync failed: %s", err)
}
coll := &Collection{
UUID: fs.uuid,
}
err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, fs.UpdateBody(coll), map[string]interface{}{"select": []string{"uuid"}})
if err != nil {
- log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
+ return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
}
- return err
+ return nil
}
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
return dn.treenode.Child(name, replace)
}
-// sync flushes in-memory data (for the children with the given names,
-// which must be children of dn) to persistent storage. Caller must
-// have write lock on dn and the named children.
+// sync flushes in-memory data and remote block references (for the
+// children with the given names, which must be children of dn) to
+// local persistent storage. Caller must have write lock on dn and the
+// named children.
func (dn *dirnode) sync(names []string) error {
type shortBlock struct {
fn *filenode
return nil
}
+ localLocator := map[string]string{}
for _, name := range names {
fn, ok := dn.inodes[name].(*filenode)
if !ok {
continue
}
for idx, seg := range fn.segments {
- seg, ok := seg.(*memSegment)
- if !ok {
- continue
- }
- if seg.Len() > maxBlockSize/2 {
- if err := flush([]shortBlock{{fn, idx}}); err != nil {
- return err
+ switch seg := seg.(type) {
+ case storedSegment:
+ loc, ok := localLocator[seg.locator]
+ if !ok {
+ var err error
+ loc, err = dn.fs.LocalLocator(seg.locator)
+ if err != nil {
+ return err
+ }
+ localLocator[seg.locator] = loc
}
- continue
- }
- if pendingLen+seg.Len() > maxBlockSize {
- if err := flush(pending); err != nil {
- return err
+ seg.locator = loc
+ fn.segments[idx] = seg
+ case *memSegment:
+ if seg.Len() > maxBlockSize/2 {
+ if err := flush([]shortBlock{{fn, idx}}); err != nil {
+ return err
+ }
+ continue
+ }
+ if pendingLen+seg.Len() > maxBlockSize {
+ if err := flush(pending); err != nil {
+ return err
+ }
+ pending = nil
+ pendingLen = 0
}
- pending = nil
- pendingLen = 0
+ pending = append(pending, shortBlock{fn, idx})
+ pendingLen += seg.Len()
+ default:
+ panic(fmt.Sprintf("can't sync segment type %T", seg))
}
- pending = append(pending, shortBlock{fn, idx})
- pendingLen += seg.Len()
}
}
return flush(pending)
}
-// caller must have read lock.
+// caller must have write lock.
func (dn *dirnode) marshalManifest(prefix string) (string, error) {
var streamLen int64
type filepart struct {
var subdirs string
var blocks []string
+ if len(dn.inodes) == 0 {
+ if prefix == "." {
+ return "", nil
+ }
+ // Express the existence of an empty directory by
+ // adding an empty file named `\056`, which (unlike
+ // the more obvious spelling `.`) is accepted by the
+ // API's manifest validator.
+ return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
+ }
+
names := make([]string, 0, len(dn.inodes))
for name := range dn.inodes {
names = append(names, name)
}
name := dirname + "/" + manifestUnescape(toks[2])
fnode, err := dn.createFileAndParents(name)
- if err != nil {
- return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+ if fnode == nil && err == nil && length == 0 {
+ // Special case: an empty file used as
+ // a marker to preserve an otherwise
+ // empty directory in a manifest.
+ continue
+ }
+ if err != nil || (fnode == nil && length != 0) {
+ return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
}
// Map the stream offset/range coordinates to
// block/offset/range coordinates and add
return nil
}
-// only safe to call from loadManifest -- no locking
+// only safe to call from loadManifest -- no locking.
+//
+// If path is a "parent directory exists" marker (the last path
+// component is "."), the returned values are both nil.
func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
var node inode = dn
names := strings.Split(path, "/")
basename := names[len(names)-1]
- if !permittedName(basename) {
- err = fmt.Errorf("invalid file part %q in path %q", basename, path)
- return
- }
for _, name := range names[:len(names)-1] {
switch name {
case "", ".":
return
}
}
+ if basename == "." {
+ return
+ } else if !permittedName(basename) {
+ err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+ return
+ }
_, err = node.Child(basename, func(child inode) (inode, error) {
switch child := child.(type) {
case nil:
import (
"bytes"
"crypto/md5"
+ "crypto/sha1"
"errors"
"fmt"
"io"
"os"
"regexp"
"runtime"
+ "strings"
"sync"
"testing"
"time"
var _ = check.Suite(&CollectionFSSuite{})
type keepClientStub struct {
- blocks map[string][]byte
+ blocks map[string][]byte
+ refreshable map[string]bool
sync.RWMutex
}
return locator, 1, nil
}
+var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+
+func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+ kcs.Lock()
+ defer kcs.Unlock()
+ if strings.Contains(locator, "+R") {
+ if len(locator) < 32 {
+ return "", fmt.Errorf("bad locator: %q", locator)
+ }
+ if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
+ return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
+ }
+ }
+ fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
+ return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+}
+
type CollectionFSSuite struct {
client *Client
coll Collection
fs CollectionFileSystem
- kc keepClient
+ kc *keepClientStub
}
func (s *CollectionFSSuite) SetUpTest(c *check.C) {
checkSize(11)
}
+func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
+ foo := "foo"
+ bar := "bar"
+ hash := map[string]string{
+ foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
+ bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
+ }
+
+ fs, err := (&Collection{
+ ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
+ }).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ manifest, err := fs.MarshalManifest(".")
+ c.Check(manifest, check.Equals, "")
+ c.Check(err, check.NotNil)
+
+ s.kc.refreshable = map[string]bool{hash[bar]: true}
+
+ for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
+ fs, err = (&Collection{
+ ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
+ }).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ manifest, err := fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ // Both blocks should now have +A signatures.
+ c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
+ c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
+ }
+}
+
func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
maxBlockSize = 8
defer func() { maxBlockSize = 2 << 26 }()
f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
c.Assert(err, check.IsNil)
defer f.Close()
- for i := 0; i < 6502; i++ {
+ for i := 0; i < 1024; i++ {
r := rand.Uint32()
switch {
case r%11 == 0:
c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
}
+func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("foo", 0755)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("bar", 0755)
+ c.Assert(err, check.IsNil)
+ err = fs.Rename("bar", "baz")
+ c.Check(err, check.IsNil)
+ err = fs.Rename("foo", "baz")
+ c.Check(err, check.NotNil)
+ err = fs.Rename("foo", "baz/")
+ c.Check(err, check.IsNil)
+ err = fs.Rename("baz/foo", ".")
+ c.Check(err, check.Equals, ErrInvalidArgument)
+ err = fs.Rename("baz/foo/", ".")
+ c.Check(err, check.Equals, ErrInvalidArgument)
+}
+
func (s *CollectionFSSuite) TestRename(c *check.C) {
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
}
}
-func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
+func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
var err error
s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
- for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} {
+ for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
err = s.fs.Mkdir(name, 0755)
c.Assert(err, check.IsNil)
}
c.Check(err, check.IsNil)
c.Check(buf, check.DeepEquals, data)
}
+
+ expectDir := map[string]int{
+ "empty": 0,
+ "not empty": 1,
+ "not empty/empty": 0,
+ }
+ for name, expectLen := range expectDir {
+ _, err := persisted.Open(name + "/bogus")
+ c.Check(err, check.NotNil)
+
+ d, err := persisted.Open(name)
+ defer d.Close()
+ c.Check(err, check.IsNil)
+ fi, err := d.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Check(fi, check.HasLen, expectLen)
+ }
}
func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
"./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
} {
for _, txt := range []string{
"",
". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
- ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
} {
const (
SpectatorToken = "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu"
ActiveToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+ ActiveTokenUUID = "zzzzz-gj3su-077z32aux8dg2s1"
ActiveTokenV2 = "v2/zzzzz-gj3su-077z32aux8dg2s1/3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
AdminToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
AnonymousToken = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
"time"
)
+const (
+ HeaderRequestID = "X-Request-Id"
+)
+
// IDGenerator generates alphanumeric strings suitable for use as
// unique IDs (a given IDGenerator will never return the same ID
// twice).
func AddRequestIDs(h http.Handler) http.Handler {
gen := &IDGenerator{Prefix: "req-"}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- if req.Header.Get("X-Request-Id") == "" {
+ if req.Header.Get(HeaderRequestID) == "" {
if req.Header == nil {
req.Header = http.Header{}
}
- req.Header.Set("X-Request-Id", gen.Next())
+ req.Header.Set(HeaderRequestID, gen.Next())
}
h.ServeHTTP(w, req)
})
}
}
-func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, int64, string, error) {
+func (kc *KeepClient) getOrHead(method string, locator string, header http.Header) (io.ReadCloser, int64, string, http.Header, error) {
if strings.HasPrefix(locator, "d41d8cd98f00b204e9800998ecf8427e+0") {
- return ioutil.NopCloser(bytes.NewReader(nil)), 0, "", nil
+ return ioutil.NopCloser(bytes.NewReader(nil)), 0, "", nil, nil
}
reqid := kc.getRequestID()
errs = append(errs, fmt.Sprintf("%s: %v", url, err))
continue
}
- req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
- req.Header.Add("X-Request-Id", reqid)
+ for k, v := range header {
+ req.Header[k] = append([]string(nil), v...)
+ }
+ if req.Header.Get("Authorization") == "" {
+ req.Header.Set("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
+ }
+ if req.Header.Get("X-Request-Id") == "" {
+ req.Header.Set("X-Request-Id", reqid)
+ }
resp, err := kc.httpClient().Do(req)
if err != nil {
// Probably a network error, may be transient,
if expectLength < 0 {
if resp.ContentLength < 0 {
resp.Body.Close()
- return nil, 0, "", fmt.Errorf("error reading %q: no size hint, no Content-Length header in response", locator)
+ return nil, 0, "", nil, fmt.Errorf("error reading %q: no size hint, no Content-Length header in response", locator)
}
expectLength = resp.ContentLength
} else if resp.ContentLength >= 0 && expectLength != resp.ContentLength {
resp.Body.Close()
- return nil, 0, "", fmt.Errorf("error reading %q: size hint %d != Content-Length %d", locator, expectLength, resp.ContentLength)
+ return nil, 0, "", nil, fmt.Errorf("error reading %q: size hint %d != Content-Length %d", locator, expectLength, resp.ContentLength)
}
// Success
if method == "GET" {
Reader: resp.Body,
Hash: md5.New(),
Check: locator[0:32],
- }, expectLength, url, nil
+ }, expectLength, url, resp.Header, nil
} else {
resp.Body.Close()
- return nil, expectLength, url, nil
+ return nil, expectLength, url, resp.Header, nil
}
}
serversToTry = retryList
isTemp: len(serversToTry) > 0,
}}
}
- return nil, 0, "", err
+ return nil, 0, "", nil, err
+}
+
+// LocalLocator returns a locator equivalent to the one supplied, but
+// with a valid signature from the local cluster. If the given locator
+// already has a local signature, it is returned unchanged.
+func (kc *KeepClient) LocalLocator(locator string) (string, error) {
+ if !strings.Contains(locator, "+R") {
+ // Either it has +A, or it's unsigned and we assume
+ // it's a local locator on a site with signatures
+ // disabled.
+ return locator, nil
+ }
+ sighdr := fmt.Sprintf("local, time=%s", time.Now().UTC().Format(time.RFC3339))
+ _, _, url, hdr, err := kc.getOrHead("HEAD", locator, http.Header{"X-Keep-Signature": []string{sighdr}})
+ if err != nil {
+ return "", err
+ }
+ loc := hdr.Get("X-Keep-Locator")
+ if loc == "" {
+ return "", fmt.Errorf("missing X-Keep-Locator header in HEAD response from %s", url)
+ }
+ return loc, nil
}
// Get() retrieves a block, given a locator. Returns a reader, the
// reader returned by this method will return a BadChecksum error
// instead of EOF.
func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
- return kc.getOrHead("GET", locator)
+ rdr, size, url, _, err := kc.getOrHead("GET", locator, nil)
+ return rdr, size, url, err
}
// ReadAt() retrieves a portion of block from the cache if it's
// Returns the data size (content length) reported by the Keep service
// and the URI reporting the data size.
func (kc *KeepClient) Ask(locator string) (int64, string, error) {
- _, size, url, err := kc.getOrHead("HEAD", locator)
+ _, size, url, _, err := kc.getOrHead("HEAD", locator, nil)
return size, url, err
}
keepAlive = DefaultKeepAlive
}
- transport, ok := http.DefaultTransport.(*http.Transport)
- if ok {
- copy := *transport
- transport = ©
- } else {
- // Evidently the application has replaced
- // http.DefaultTransport with a different type, so we
- // need to build our own from scratch using the Go 1.8
- // defaults.
- transport = &http.Transport{
+ c := &http.Client{
+ Timeout: requestTimeout,
+ // It's not safe to copy *http.DefaultTransport
+ // because it has a mutex (which might be locked)
+ // protecting a private map (which might not be nil).
+ // So we build our own, using the Go 1.10 default
+ // values, ignoring any changes the application has
+ // made to http.DefaultTransport.
+ Transport: &http.Transport{
+ DialContext: (&net.Dialer{
+ Timeout: connectTimeout,
+ KeepAlive: keepAlive,
+ DualStack: true,
+ }).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
+ TLSHandshakeTimeout: tlsTimeout,
ExpectContinueTimeout: time.Second,
- }
- }
- transport.DialContext = (&net.Dialer{
- Timeout: connectTimeout,
- KeepAlive: keepAlive,
- DualStack: true,
- }).DialContext
- transport.TLSHandshakeTimeout = tlsTimeout
- transport.TLSClientConfig = arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure)
- c := &http.Client{
- Timeout: requestTimeout,
- Transport: transport,
+ TLSClientConfig: arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure),
+ },
}
defaultClient[kc.Arvados.ApiInsecure][kc.foundNonDiskSvc] = c
return c
c.Check(n, Equals, int64(len(content)))
c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
}
+ {
+ loc, err := kc.LocalLocator(hash)
+ c.Check(err, Equals, nil)
+ c.Assert(len(loc) >= 32, Equals, true)
+ c.Check(loc[:32], Equals, hash[:32])
+ }
+ {
+ content := []byte("the perth county conspiracy")
+ loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
+ c.Check(loc, Equals, "")
+ c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
+ c.Check(err, ErrorMatches, `.*HTTP 400.*`)
+ }
}
type StubProxyHandler struct {
return True
return False
+ @synchronized
+ def has_remote_blocks(self):
+ """Returns True if any of the segment's locators has a +R signature"""
+
+ for s in self._segments:
+ if '+R' in s.locator:
+ return True
+ return False
+
+ @synchronized
+ def _copy_remote_blocks(self, remote_blocks={}):
+ """Ask Keep to copy remote blocks and point to their local copies.
+
+ This is called from the parent Collection.
+
+ :remote_blocks:
+ Shared cache of remote to local block mappings. This is used to avoid
+ doing extra work when blocks are shared by more than one file in
+ different subdirectories.
+ """
+
+ for s in self._segments:
+ if '+R' in s.locator:
+ try:
+ loc = remote_blocks[s.locator]
+ except KeyError:
+ loc = self.parent._my_keep().refresh_signature(s.locator)
+ remote_blocks[s.locator] = loc
+ s.locator = loc
+ self.parent.set_committed(False)
+ return remote_blocks
+
@synchronized
def segments(self):
return copy.copy(self._segments)
def __init__(self, parent=None):
self.parent = parent
self._committed = False
+ self._has_remote_blocks = False
self._callback = None
self._items = {}
def stream_name(self):
raise NotImplementedError()
+ @synchronized
+ def has_remote_blocks(self):
+ """Recursively check for a +R segment locator signature."""
+
+ if self._has_remote_blocks:
+ return True
+ for item in self:
+ if self[item].has_remote_blocks():
+ return True
+ return False
+
+ @synchronized
+ def set_has_remote_blocks(self, val):
+ self._has_remote_blocks = val
+ if self.parent:
+ self.parent.set_has_remote_blocks(val)
+
@must_be_writable
@synchronized
def find_or_create(self, path, create_type):
self._items[target_name] = item
self.set_committed(False)
+ if not self._has_remote_blocks and source_obj.has_remote_blocks():
+ self.set_has_remote_blocks(True)
if modified_from:
self.notify(MOD, self, target_name, (modified_from, item))
else:
return self._manifest_text
+ @synchronized
+ def _copy_remote_blocks(self, remote_blocks={}):
+ """Scan through the entire collection and ask Keep to copy remote blocks.
+
+ When accessing a remote collection, blocks will have a remote signature
+ (+R instead of +A). Collect these signatures and request Keep to copy the
+ blocks to the local cluster, returning local (+A) signatures.
+
+ :remote_blocks:
+ Shared cache of remote to local block mappings. This is used to avoid
+ doing extra work when blocks are shared by more than one file in
+ different subdirectories.
+
+ """
+ for item in self:
+ remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
+ return remote_blocks
+
@synchronized
def diff(self, end_collection, prefix=".", holding_collection=None):
"""Generate list of add/modify/delete actions.
self._manifest_locator = manifest_locator_or_text
elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
+ if not self._has_local_collection_uuid():
+ self._has_remote_blocks = True
elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
self._manifest_text = manifest_locator_or_text
+ if '+R' in self._manifest_text:
+ self._has_remote_blocks = True
else:
raise errors.ArgumentError(
"Argument to CollectionReader is not a manifest or a collection UUID")
def _has_collection_uuid(self):
return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
+ def _has_local_collection_uuid(self):
+ return self._has_collection_uuid and \
+ self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
+
def __enter__(self):
return self
body["trash_at"] = t
if not self.committed():
+ if self._has_remote_blocks:
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+ self._has_remote_blocks = False
if not self._has_collection_uuid():
raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
+ elif not self._has_local_collection_uuid():
+ raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
self._my_block_manager().commit_all()
if trash_at and type(trash_at) is not datetime.datetime:
raise errors.ArgumentError("trash_at must be datetime type.")
+ if self._has_remote_blocks:
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+ self._has_remote_blocks = False
+
self._my_block_manager().commit_all()
text = self.manifest_text(strip=False)
_block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
_segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+ def _unescape_manifest_path(self, path):
+ return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
+
@synchronized
def _import_manifest(self, manifest_text):
"""Import a manifest into a `Collection`.
if state == STREAM_NAME:
# starting a new stream
- stream_name = tok.replace('\\040', ' ')
+ stream_name = self._unescape_manifest_path(tok)
blocks = []
segments = []
streamoffset = 0
if file_segment:
pos = int(file_segment.group(1))
size = int(file_segment.group(2))
- name = file_segment.group(3).replace('\\040', ' ')
- filepath = os.path.join(stream_name, name)
- afile = self.find_or_create(filepath, FILE)
- if isinstance(afile, ArvadosFile):
- afile.add_segment(blocks, pos, size)
+ name = self._unescape_manifest_path(file_segment.group(3))
+ if name.split('/')[-1] == '.':
+ # placeholder for persisting an empty directory, not a real file
+ if len(name) > 2:
+ self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
else:
- raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
+ filepath = os.path.join(stream_name, name)
+ afile = self.find_or_create(filepath, FILE)
+ if isinstance(afile, ArvadosFile):
+ afile.add_segment(blocks, pos, size)
+ else:
+ raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
else:
# error!
raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
if method == "HEAD":
curl.setopt(pycurl.NOBODY, True)
- self._setcurltimeouts(curl, timeout)
+ self._setcurltimeouts(curl, timeout, method=="HEAD")
try:
curl.perform()
_logger.info("HEAD %s: %s bytes",
self._result['status_code'],
self._result.get('content-length'))
+ if self._result['headers'].get('x-keep-locator'):
+ # This is a response to a remote block copy request, return
+ # the local copy block locator.
+ return self._result['headers'].get('x-keep-locator')
return True
_logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
self.upload_counter.add(len(body))
return True
- def _setcurltimeouts(self, curl, timeouts):
+ def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
if not timeouts:
return
elif isinstance(timeouts, tuple):
conn_t, xfer_t = (timeouts, timeouts)
bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
- curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+ if not ignore_bandwidth:
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
if isinstance(header_line, bytes):
else:
return None
+ def refresh_signature(self, loc):
+ """Ask Keep to get the remote block and return its local signature"""
+ now = datetime.datetime.utcnow().isoformat("T") + 'Z'
+ return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
+
@retry.retry_method
def head(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="HEAD", **kwargs)
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
self.misses_counter.add(1)
- headers = {
- 'X-Request-Id': (request_id or
- (hasattr(self, 'api_client') and self.api_client.request_id) or
- arvados.util.new_request_id()),
- }
+ if headers is None:
+ headers = {}
+ headers['X-Request-Id'] = (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id())
# If the locator has hints specifying a prefix (indicating a
# remote keepproxy) or the UUID of a local gateway service,
# Always cache the result, then return it if we succeeded.
if loop.success():
- if method == "HEAD":
- return True
- else:
- return blob
+ return blob
finally:
if slot is not None:
slot.set(blob)
class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
KEEP_SERVER = {}
+ local_locator_re = r"[0-9a-f]{32}\+\d+\+A[a-f0-9]{40}@[a-f0-9]{8}"
+ remote_locator_re = r"[0-9a-f]{32}\+\d+\+R[a-z]{5}-[a-f0-9]{40}@[a-f0-9]{8}"
def setUp(self):
self.keep_put = getattr(arvados.keep.KeepClient, 'put')
- def test_repacked_block_submission_get_permission_token(self):
+ @mock.patch('arvados.keep.KeepClient.put', autospec=True)
+ def test_repacked_block_submission_get_permission_token(self, mocked_put):
'''
Make sure that those blocks that are committed after repacking small ones,
get their permission tokens assigned on the collection manifest.
time.sleep(1)
return self.keep_put(*args, **kwargs)
- re_locator = "[0-9a-f]{32}\+\d+\+A[a-f0-9]{40}@[a-f0-9]{8}"
-
- with mock.patch('arvados.keep.KeepClient.put', autospec=True) as mocked_put:
- mocked_put.side_effect = wrapped_keep_put
- c = Collection()
- # Write 70 files ~1MiB each so we force to produce 1 big block by repacking
- # small ones before finishing the upload.
- for i in range(70):
- f = c.open("file_{}.txt".format(i), 'wb')
- f.write(random.choice('abcdefghijklmnopqrstuvwxyz') * (2**20+i))
- f.close(flush=False)
- # We should get 2 blocks with their tokens
- self.assertEqual(len(re.findall(re_locator, c.manifest_text())), 2)
+ mocked_put.side_effect = wrapped_keep_put
+ c = Collection()
+ # Write 70 files ~1MiB each so we force to produce 1 big block by repacking
+ # small ones before finishing the upload.
+ for i in range(70):
+ f = c.open("file_{}.txt".format(i), 'wb')
+ f.write(random.choice('abcdefghijklmnopqrstuvwxyz') * (2**20+i))
+ f.close(flush=False)
+ # We should get 2 blocks with their tokens
+ self.assertEqual(len(re.findall(self.local_locator_re, c.manifest_text())), 2)
+
+ @mock.patch('arvados.keep.KeepClient.refresh_signature')
+ def test_copy_remote_blocks_on_save_new(self, rs_mock):
+ remote_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+Remote-" + "a" * 40 + "@abcdef01"
+ local_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+A" + "b" * 40 + "@abcdef01"
+ rs_mock.return_value = local_block_loc
+ c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, c.manifest_text())), 1)
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, c.manifest_text())), 0)
+ c.save_new()
+ rs_mock.assert_called()
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, c.manifest_text())), 0)
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, c.manifest_text())), 1)
+
+ @mock.patch('arvados.keep.KeepClient.refresh_signature')
+ def test_copy_remote_blocks_on_save(self, rs_mock):
+ remote_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+Remote-" + "a" * 40 + "@abcdef01"
+ local_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+A" + "b" * 40 + "@abcdef01"
+ rs_mock.return_value = local_block_loc
+ # Remote collection
+ remote_c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, remote_c.manifest_text())), 1)
+ # Local collection
+ local_c = Collection()
+ with local_c.open('barfile.txt', 'wb') as f:
+ f.write('bar')
+ local_c.save_new()
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, local_c.manifest_text())), 1)
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
+ # Copy remote file to local collection
+ local_c.copy('./foofile.txt', './copied/foofile.txt', remote_c)
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, local_c.manifest_text())), 1)
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, local_c.manifest_text())), 1)
+ # Save local collection: remote block should be copied
+ local_c.save()
+ rs_mock.assert_called()
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, local_c.manifest_text())), 2)
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
None)
+ def test_refresh_signature(self):
+ blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
+ blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
+ local_loc = blk_digest+'+A'+blk_sig
+ remote_loc = blk_digest+'+R'+blk_sig
+ api_client = self.mock_keep_services(count=1)
+ headers = {'X-Keep-Locator':local_loc}
+ with tutil.mock_keep_responses('', 200, **headers):
+ # Check that the translated locator gets returned
+ keep_client = arvados.KeepClient(api_client=api_client)
+ self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
+ # Check that refresh_signature() uses the correct method and headers
+ keep_client._get_or_head = mock.MagicMock()
+ keep_client.refresh_signature(remote_loc)
+ args, kwargs = keep_client._get_or_head.call_args_list[0]
+ self.assertIn(remote_loc, args)
+ self.assertEqual("HEAD", kwargs['method'])
+ self.assertIn('X-Keep-Signature', kwargs['headers'])
+
# test_*_timeout verify that KeepClient instructs pycurl to use
# the appropriate connection and read timeouts. They don't care
# whether pycurl actually exhibits the expected timeout behavior
int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
self.assertEqual(
mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
- int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+ None)
self.assertEqual(
mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
- int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
+ None)
def test_proxy_get_timeout(self):
api_client = self.mock_keep_services(service_type='proxy', count=1)
int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
self.assertEqual(
mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
- int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+ None)
self.assertEqual(
mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
- int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
+ None)
def test_proxy_put_timeout(self):
api_client = self.mock_keep_services(service_type='proxy', count=1)
self.assertEqual(1, req_mock.call_count)
+@tutil.skip_sleep
+class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.data = b'xyzzy'
+ self.locator = '1271ed5ef305aadabc605b1609e24c52'
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_get_request_cache(self, get_mock):
+ with tutil.mock_keep_responses(self.data, 200, 200):
+ self.keep_client.get(self.locator)
+ self.keep_client.get(self.locator)
+ # Request already cached, don't require more than one request
+ get_mock.assert_called_once()
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_head_request_cache(self, get_mock):
+ with tutil.mock_keep_responses(self.data, 200, 200):
+ self.keep_client.head(self.locator)
+ self.keep_client.head(self.locator)
+ # Don't cache HEAD requests so that they're not confused with GET reqs
+ self.assertEqual(2, get_mock.call_count)
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_head_and_then_get_return_different_responses(self, get_mock):
+ head_resp = None
+ get_resp = None
+ get_mock.side_effect = ['first response', 'second response']
+ with tutil.mock_keep_responses(self.data, 200, 200):
+ head_resp = self.keep_client.head(self.locator)
+ get_resp = self.keep_client.get(self.locator)
+ self.assertEqual('first response', head_resp)
+ # First reponse was not cached because it was from a HEAD request.
+ self.assertNotEqual(head_resp, get_resp)
+
+
@tutil.skip_sleep
class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
def setUp(self):
loc = kc.put(self.DATA, copies=1, num_retries=0)
self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
with self.assertTakesGreater(self.TIMEOUT_TIME):
- with self.assertRaises(arvados.errors.KeepReadError) as e:
+ with self.assertRaises(arvados.errors.KeepReadError):
kc.get(loc, num_retries=0)
with self.assertTakesGreater(self.TIMEOUT_TIME):
with self.assertRaises(arvados.errors.KeepWriteError):
self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
self.server.setdelays(response=self.TIMEOUT_TIME)
with self.assertTakesGreater(self.TIMEOUT_TIME):
- with self.assertRaises(arvados.errors.KeepReadError) as e:
+ with self.assertRaises(arvados.errors.KeepReadError):
kc.get(loc, num_retries=0)
with self.assertTakesGreater(self.TIMEOUT_TIME):
with self.assertRaises(arvados.errors.KeepWriteError):
kc.put(self.DATA, copies=1, num_retries=0)
with self.assertTakesGreater(self.TIMEOUT_TIME):
- with self.assertRaises(arvados.errors.KeepReadError) as e:
- kc.head(loc, num_retries=0)
+ kc.head(loc, num_retries=0)
def test_low_bandwidth_with_server_mid_delay_failure(self):
kc = self.keepClient()
railties (>= 4)
request_store (~> 1.0)
logstash-event (1.2.02)
- loofah (2.2.2)
+ loofah (2.2.3)
crass (~> 1.0.2)
nokogiri (>= 1.5.9)
mail (2.7.0)
net-ssh (4.2.0)
net-ssh-gateway (2.0.0)
net-ssh (>= 4.0.0)
- nokogiri (1.8.2)
+ nokogiri (1.8.5)
mini_portile2 (~> 2.3.0)
oauth2 (1.4.0)
faraday (>= 0.8, < 0.13)
limit: { type: 'integer', required: false, default: DEFAULT_LIMIT },
offset: { type: 'integer', required: false, default: 0 },
count: { type: 'string', required: false, default: 'exact' },
+ cluster_id: {
+ type: 'string',
+ description: "List objects on a remote federated cluster instead of the current one.",
+ location: "query",
+ required: false,
+ },
}
end
def account_is_setup(user)
@user = user
- mail(to: user.email, subject: 'Welcome to Curoverse - shell account enabled')
+ mail(to: user.email, subject: 'Welcome to Arvados - shell account enabled')
end
end
if loc = Keep::Locator.parse(search_term)
loc.strip_hints!
coll_match = readable_by(*readers).where(portable_data_hash: loc.to_s).limit(1)
- return get_compatible_images(readers, pattern, coll_match)
+ if coll_match.any? or Rails.configuration.remote_hosts.length == 0
+ return get_compatible_images(readers, pattern, coll_match)
+ else
+ # Allow bare pdh that doesn't exist in the local database so
+ # that federated container requests which refer to remotely
+ # stored containers will validate.
+ return [Collection.new(portable_data_hash: loc.to_s)]
+ end
end
if search_tag.nil? and (n = search_term.index(":"))
if mount['kind'] != 'collection'
next
end
- if (uuid = mount.delete 'uuid')
+
+ uuid = mount.delete 'uuid'
+
+ if mount['portable_data_hash'].nil? and !uuid.nil?
+ # PDH not supplied, try by UUID
c = Collection.
readable_by(current_user).
where(uuid: uuid).
if !c
raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found"
end
- if mount['portable_data_hash'].nil?
- # PDH not supplied by client
- mount['portable_data_hash'] = c.portable_data_hash
- elsif mount['portable_data_hash'] != c.portable_data_hash
- # UUID and PDH supplied by client, but they don't agree
- raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request"
- end
+ mount['portable_data_hash'] = c.portable_data_hash
end
end
return c_mounts
return false
end
- if current_api_client_authorization.andand.uuid.andand == self.auth_uuid
- # The contained process itself can update progress indicators,
- # but can't change priority etc.
- permitted = permitted & (progress_attrs + final_attrs + [:state] - [:log])
+ if self.state == Running &&
+ !current_api_client_authorization.nil? &&
+ (current_api_client_authorization.uuid == self.auth_uuid ||
+ current_api_client_authorization.token == self.runtime_token)
+ # The contained process itself can write final attrs but can't
+ # change priority or log.
+ permitted.push *final_attrs
+ permitted = permitted - [:log, :priority]
elsif self.locked_by_uuid && self.locked_by_uuid != current_api_client_authorization.andand.uuid
# When locked, progress fields cannot be updated by the wrong
# dispatcher, even though it has admin privileges.
assert_equal Rails.configuration.user_notifier_email_from, setup_email.from[0]
assert_equal 'foo@example.com', setup_email.to[0]
- assert_equal 'Welcome to Curoverse - shell account enabled', setup_email.subject
+ assert_equal 'Welcome to Arvados - shell account enabled', setup_email.subject
assert (setup_email.body.to_s.include? 'Your Arvados shell account has been set up'),
'Expected Your Arvados shell account has been set up in email body'
assert (setup_email.body.to_s.include? "#{Rails.configuration.workbench_address}users/#{created['uuid']}/virtual_machines"), 'Expected virtual machines url in email body'
ready.pop
@remote_server = srv
@remote_host = "127.0.0.1:#{srv.config[:Port]}"
- Rails.configuration.remote_hosts['zbbbb'] = @remote_host
- Rails.configuration.remote_hosts['zbork'] = @remote_host
+ Rails.configuration.remote_hosts = Rails.configuration.remote_hosts.merge({'zbbbb' => @remote_host,
+ 'zbork' => @remote_host})
Arvados::V1::SchemaController.any_instance.stubs(:root_url).returns "https://#{@remote_host}"
@stub_status = 200
@stub_content = {
"path" => "/foo",
}
end],
+ [{"/out" => {
+ "kind" => "collection",
+ "portable_data_hash" => "1f4b0bc7583c2a7f9102c395f4ffc5e3+45",
+ "path" => "/foo"}},
+ lambda do |resolved|
+ resolved["/out"] == {
+ "portable_data_hash" => "1f4b0bc7583c2a7f9102c395f4ffc5e3+45",
+ "kind" => "collection",
+ "path" => "/foo",
+ }
+ end],
+ # Empty collection
+ [{"/out" => {
+ "kind" => "collection",
+ "path" => "/foo"}},
+ lambda do |resolved|
+ resolved["/out"] == {
+ "kind" => "collection",
+ "path" => "/foo",
+ }
+ end],
].each do |mounts, okfunc|
test "resolve mounts #{mounts.inspect} to values" do
set_user_from_auth :active
"path" => "/foo",
},
}
- assert_raises(ArgumentError) do
- Container.resolve_mounts(m)
- end
+ resolved_mounts = Container.resolve_mounts(m)
+ assert_equal m['portable_data_hash'], resolved_mounts['portable_data_hash']
end
['arvados/apitestfixture:latest',
end
end
+ test "allow unrecognized container when there are remote_hosts" do
+ set_user_from_auth :active
+ Rails.configuration.remote_hosts = {"foooo" => "bar.com"}
+ Container.resolve_container_image('acbd18db4cc2f85cedef654fccc4a4d8+3')
+ end
+
test "migrated docker image" do
Rails.configuration.docker_image_formats = ['v2']
add_docker19_migration_link
assert_equal [logpdh_time2], Collection.where(uuid: [cr1log_uuid, cr2log_uuid]).to_a.collect(&:portable_data_hash).uniq
end
- test "auth_uuid can set output, progress, runtime_status, state on running container -- but not log" do
- set_user_from_auth :active
- c, _ = minimal_new
- set_user_from_auth :dispatch1
- c.lock
- c.update_attributes! state: Container::Running
-
- auth = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
- Thread.current[:api_client_authorization] = auth
- Thread.current[:api_client] = auth.api_client
- Thread.current[:token] = auth.token
- Thread.current[:user] = auth.user
+ ["auth_uuid", "runtime_token"].each do |tok|
+ test "#{tok} can set output, progress, runtime_status, state on running container -- but not log" do
+ if tok == "runtime_token"
+ set_user_from_auth :spectator
+ c, _ = minimal_new(container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
+ runtime_token: api_client_authorizations(:active).token)
+ else
+ set_user_from_auth :active
+ c, _ = minimal_new
+ end
+ set_user_from_auth :dispatch1
+ c.lock
+ c.update_attributes! state: Container::Running
+
+ if tok == "runtime_token"
+ auth = ApiClientAuthorization.validate(token: c.runtime_token)
+ Thread.current[:api_client_authorization] = auth
+ Thread.current[:api_client] = auth.api_client
+ Thread.current[:token] = auth.token
+ Thread.current[:user] = auth.user
+ else
+ auth = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
+ Thread.current[:api_client_authorization] = auth
+ Thread.current[:api_client] = auth.api_client
+ Thread.current[:token] = auth.token
+ Thread.current[:user] = auth.user
+ end
- assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
- assert c.update_attributes(runtime_status: {'warning' => 'something happened'})
- assert c.update_attributes(progress: 0.5)
- refute c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
- c.reload
- assert c.update_attributes(state: Container::Complete, exit_code: 0)
+ assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
+ assert c.update_attributes(runtime_status: {'warning' => 'something happened'})
+ assert c.update_attributes(progress: 0.5)
+ refute c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
+ c.reload
+ assert c.update_attributes(state: Container::Complete, exit_code: 0)
+ end
end
test "not allowed to set output that is not readable by current user" do
'locator' => BAD_COLLECTION,
}.each_pair do |spec_type, image_spec|
test "Job validation fails with nonexistent Docker image #{spec_type}" do
+ Rails.configuration.remote_hosts = {}
job = Job.new job_attrs(runtime_constraints:
{'docker_image' => image_spec})
assert(job.invalid?, "nonexistent Docker image #{spec_type} was valid")
# Test the body of the sent email contains what we expect it to
assert_equal Rails.configuration.user_notifier_email_from, email.from.first
assert_equal user.email, email.to.first
- assert_equal 'Welcome to Curoverse - shell account enabled', email.subject
+ assert_equal 'Welcome to Arvados - shell account enabled', email.subject
assert (email.body.to_s.include? 'Your Arvados shell account has been set up'),
'Expected Your Arvados shell account has been set up in email body'
assert (email.body.to_s.include? Rails.configuration.workbench_address),
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
- "github.com/shirou/gopsutil/process"
"golang.org/x/net/context"
dockertypes "github.com/docker/docker/api/types"
PutB(buf []byte) (string, int, error)
ReadAt(locator string, p []byte, off int) (int, error)
ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
+ LocalLocator(locator string) (string, error)
ClearBlockCache()
}
ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
+ ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- Docker ThinDockerClient
- client *arvados.Client
- ArvClient IArvadosClient
- Kc IKeepClient
+ Docker ThinDockerClient
+
+ // Dispatcher client is initialized with the Dispatcher token.
+ // This is a priviledged token used to manage container status
+ // and logs.
+ //
+ // We have both dispatcherClient and DispatcherArvClient
+ // because there are two different incompatible Arvados Go
+ // SDKs and we have to use both (hopefully this gets fixed in
+ // #14467)
+ dispatcherClient *arvados.Client
+ DispatcherArvClient IArvadosClient
+ DispatcherKeepClient IKeepClient
+
+ // Container client is initialized with the Container token
+ // This token controls the permissions of the container, and
+ // must be used for operations such as reading collections.
+ //
+ // Same comment as above applies to
+ // containerClient/ContainerArvClient.
+ containerClient *arvados.Client
+ ContainerArvClient IArvadosClient
+ ContainerKeepClient IKeepClient
+
Container arvados.Container
ContainerConfig dockercontainer.Config
HostConfig dockercontainer.HostConfig
SigChan chan os.Signal
ArvMountExit chan error
SecretMounts map[string]arvados.Mount
- MkArvClient func(token string) (IArvadosClient, error)
+ MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
finalState string
parentTemp string
- ListProcesses func() ([]PsProcess, error)
-
statLogger io.WriteCloser
statReporter *crunchstat.Reporter
hoststatLogger io.WriteCloser
cStateLock sync.Mutex
cCancelled bool // StopContainer() invoked
+ cRemoved bool // docker confirmed the container no longer exists
+
+ enableNetwork string // one of "default" or "always"
+ networkMode string // passed through to HostConfig.NetworkMode
+ arvMountLog *ThrottledLogger
- enableNetwork string // one of "default" or "always"
- networkMode string // passed through to HostConfig.NetworkMode
- arvMountLog *ThrottledLogger
- checkContainerd time.Duration
+ containerWatchdogInterval time.Duration
}
// setupSignals sets up signal handling to gracefully terminate the underlying
if err != nil {
runner.CrunchLog.Printf("error removing container: %s", err)
}
+ if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) {
+ runner.cRemoved = true
+ }
}
var errorBlacklist = []string{
runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
var collection arvados.Collection
- err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
+ err = runner.ContainerArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
if err != nil {
return fmt.Errorf("While getting container image collection: %v", err)
}
runner.CrunchLog.Print("Loading Docker image from keep")
var readCloser io.ReadCloser
- readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
+ readCloser, err = runner.ContainerKeepClient.ManifestFileReader(manifest, img)
if err != nil {
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
runner.ContainerConfig.Image = imageID
- runner.Kc.ClearBlockCache()
+ runner.ContainerKeepClient.ClearBlockCache()
return nil
}
if err != nil {
return fmt.Errorf("creating temp dir: %v", err)
}
- err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
+ err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
if err != nil {
return err
}
return false, err
}
w := &ArvLogWriter{
- ArvClient: runner.ArvClient,
+ ArvClient: runner.DispatcherArvClient,
UUID: runner.Container.UUID,
loggingStream: label,
writeCloser: writer,
}
- reader, err := runner.ArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
+ reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
if err != nil {
return false, fmt.Errorf("error getting %s record: %v", label, err)
}
if collId == "" {
collId = stdinMnt.PortableDataHash
}
- err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+ err = runner.ContainerArvClient.Get("collections", collId, nil, &stdinColl)
if err != nil {
- return fmt.Errorf("While getting stding collection: %v", err)
+ return fmt.Errorf("While getting stdin collection: %v", err)
}
- stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+ stdinRdr, err = runner.ContainerKeepClient.ManifestFileReader(
+ manifest.Manifest{Text: stdinColl.ManifestText},
+ stdinMnt.Path)
if os.IsNotExist(err) {
return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
} else if err != nil {
return nil
}
-// checkContainerd checks if "containerd" is present in the process list.
-func (runner *ContainerRunner) CheckContainerd() error {
- if runner.checkContainerd == 0 {
- return nil
- }
- p, _ := runner.ListProcesses()
- for _, i := range p {
- e, _ := i.CmdlineSlice()
- if len(e) > 0 {
- if strings.Index(e[0], "containerd") > -1 {
- return nil
- }
- }
- }
-
- // Not found
- runner.runBrokenNodeHook()
- runner.stop(nil)
- return fmt.Errorf("'containerd' not found in process list.")
-}
-
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
func (runner *ContainerRunner) WaitFinish() error {
runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
}
- containerdGone := make(chan error)
- defer close(containerdGone)
- if runner.checkContainerd > 0 {
- go func() {
- ticker := time.NewTicker(time.Duration(runner.checkContainerd))
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- if ck := runner.CheckContainerd(); ck != nil {
- containerdGone <- ck
- return
- }
- case <-containerdGone:
- // Channel closed, quit goroutine
- return
- }
+ containerGone := make(chan struct{})
+ go func() {
+ defer close(containerGone)
+ if runner.containerWatchdogInterval < 1 {
+ runner.containerWatchdogInterval = time.Minute
+ }
+ for range time.NewTicker(runner.containerWatchdogInterval).C {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
+ ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
+ cancel()
+ runner.cStateLock.Lock()
+ done := runner.cRemoved || runner.ExitCode != nil
+ runner.cStateLock.Unlock()
+ if done {
+ return
+ } else if err != nil {
+ runner.CrunchLog.Printf("Error inspecting container: %s", err)
+ runner.checkBrokenNode(err)
+ return
+ } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
+ runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State)
+ return
}
- }()
- }
+ }
+ }()
for {
select {
runner.stop(nil)
runTimeExceeded = nil
- case err := <-containerdGone:
- return err
+ case <-containerGone:
+ return errors.New("docker client never returned status")
}
}
}
}
var updated arvados.Container
- err = runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
"container": arvadosclient.Dict{"log": saved.PortableDataHash},
}, &updated)
if err != nil {
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
// Output may have been set directly by the container, so
// refresh the container record to check.
- err := runner.ArvClient.Get("containers", runner.Container.UUID,
+ err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
nil, &runner.Container)
if err != nil {
return err
}
txt, err := (&copier{
- client: runner.client,
- arvClient: runner.ArvClient,
- keepClient: runner.Kc,
+ client: runner.containerClient,
+ arvClient: runner.ContainerArvClient,
+ keepClient: runner.ContainerKeepClient,
hostOutputDir: runner.HostOutputDir,
ctrOutputDir: runner.Container.OutputPath,
binds: runner.Binds,
if err != nil {
return err
}
+ if n := len(regexp.MustCompile(` [0-9a-f]+\+\S*\+R`).FindAllStringIndex(txt, -1)); n > 0 {
+ runner.CrunchLog.Printf("Copying %d data blocks from remote input collections...", n)
+ fs, err := (&arvados.Collection{ManifestText: txt}).FileSystem(runner.containerClient, runner.ContainerKeepClient)
+ if err != nil {
+ return err
+ }
+ txt, err = fs.MarshalManifest(".")
+ if err != nil {
+ return err
+ }
+ }
var resp arvados.Collection
- err = runner.ArvClient.Create("collections", arvadosclient.Dict{
+ err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
"ensure_unique_name": true,
"collection": arvadosclient.Dict{
"is_trashed": true,
// other further errors (such as failing to write the log to Keep!)
// while shutting down
runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
- ArvClient: runner.ArvClient,
+ ArvClient: runner.DispatcherArvClient,
UUID: runner.Container.UUID,
loggingStream: "crunch-run",
writeCloser: nil,
reqBody := arvadosclient.Dict{"collection": updates}
if runner.logUUID == "" {
reqBody["ensure_unique_name"] = true
- err = runner.ArvClient.Create("collections", reqBody, &response)
+ err = runner.DispatcherArvClient.Create("collections", reqBody, &response)
} else {
- err = runner.ArvClient.Update("collections", runner.logUUID, reqBody, &response)
+ err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
}
if err != nil {
return
if runner.cCancelled {
return ErrCancelled
}
- return runner.ArvClient.Update("containers", runner.Container.UUID,
+ return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
}
}
var auth arvados.APIClientAuthorization
- err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
+ err := runner.DispatcherArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
if err != nil {
return "", err
}
update["output"] = *runner.OutputPDH
}
}
- return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
+ return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
}
// IsCancelled returns the value of Cancelled, with goroutine safety.
return nil, err
}
return &ArvLogWriter{
- ArvClient: runner.ArvClient,
+ ArvClient: runner.DispatcherArvClient,
UUID: runner.Container.UUID,
loggingStream: name,
writeCloser: writer,
return
}
- // Sanity check that containerd is running.
- err = runner.CheckContainerd()
- if err != nil {
- return
- }
-
// check for and/or load image
err = runner.LoadImage()
if err != nil {
// Fetch the current container record (uuid = runner.Container.UUID)
// into runner.Container.
func (runner *ContainerRunner) fetchContainerRecord() error {
- reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+ reader, err := runner.DispatcherArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
if err != nil {
return fmt.Errorf("error fetching container record: %v", err)
}
return fmt.Errorf("error getting container token: %v", err)
}
- containerClient, err := runner.MkArvClient(containerToken)
+ runner.ContainerArvClient, runner.ContainerKeepClient,
+ runner.containerClient, err = runner.MkArvClient(containerToken)
if err != nil {
return fmt.Errorf("error creating container API client: %v", err)
}
- err = containerClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
+ err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
if err != nil {
if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
return fmt.Errorf("error fetching secret_mounts: %v", err)
}
// NewContainerRunner creates a new container runner.
-func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClient, docker ThinDockerClient, containerUUID string) (*ContainerRunner, error) {
+func NewContainerRunner(dispatcherClient *arvados.Client,
+ dispatcherArvClient IArvadosClient,
+ dispatcherKeepClient IKeepClient,
+ docker ThinDockerClient,
+ containerUUID string) (*ContainerRunner, error) {
+
cr := &ContainerRunner{
- client: client,
- ArvClient: api,
- Kc: kc,
- Docker: docker,
+ dispatcherClient: dispatcherClient,
+ DispatcherArvClient: dispatcherArvClient,
+ DispatcherKeepClient: dispatcherKeepClient,
+ Docker: docker,
}
cr.NewLogWriter = cr.NewArvLogWriter
cr.RunArvMount = cr.ArvMountCmd
cr.MkTempDir = ioutil.TempDir
- cr.ListProcesses = func() ([]PsProcess, error) {
- pr, err := process.Processes()
- if err != nil {
- return nil, err
- }
- ps := make([]PsProcess, len(pr))
- for i, j := range pr {
- ps[i] = j
- }
- return ps, nil
- }
- cr.MkArvClient = func(token string) (IArvadosClient, error) {
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
cl, err := arvadosclient.MakeArvadosClient()
if err != nil {
- return nil, err
+ return nil, nil, nil, err
}
cl.ApiToken = token
- return cl, nil
+ kc, err := keepclient.MakeKeepClient(cl)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ c2 := arvados.NewClientFromEnv()
+ c2.AuthToken = token
+ return cl, kc, c2, nil
}
var err error
- cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.client, cr.Kc)
+ cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.dispatcherClient, cr.DispatcherKeepClient)
if err != nil {
return nil, err
}
cr.CrunchLog = NewThrottledLogger(w)
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
- loadLogThrottleParams(api)
+ loadLogThrottleParams(dispatcherArvClient)
go cr.updateLogs()
return cr, nil
`)
memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
getVersion := flag.Bool("version", false, "Print version information and exit.")
- checkContainerd := flag.Duration("check-containerd", 60*time.Second, "Periodic check if (docker-)containerd is running (use 0s to disable).")
+ flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
detached := false
if len(os.Args) > 1 && os.Args[1] == "-detached" {
os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
detached = true
}
+
flag.Parse()
switch {
cr.expectCgroupParent = *cgroupParent
cr.enableNetwork = *enableNetwork
cr.networkMode = *networkMode
- cr.checkContainerd = *checkContainerd
if *cgroupParentSubsystem != "" {
p := findCgroup(*cgroupParentSubsystem)
cr.setCgroupParent = p
type TestSuite struct {
client *arvados.Client
docker *TestDockerClient
+ runner *ContainerRunner
}
func (s *TestSuite) SetUpTest(c *C) {
api *ArvTestClient
realTemp string
calledWait bool
+ ctrExited bool
}
func NewTestDockerClient() *TestDockerClient {
return body, err
}
+func (t *TestDockerClient) ContainerInspect(ctx context.Context, id string) (c dockertypes.ContainerJSON, err error) {
+ c.ContainerJSONBase = &dockertypes.ContainerJSONBase{}
+ c.ID = "abcde"
+ if t.ctrExited {
+ c.State = &dockertypes.ContainerState{Status: "exited", Dead: true}
+ } else {
+ c.State = &dockertypes.ContainerState{Status: "running", Pid: 1234, Running: true}
+ }
+ return
+}
+
func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
if t.exitCode == 2 {
return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
return nil
}
+func (client *KeepTestClient) LocalLocator(locator string) (string, error) {
+ return locator, nil
+}
+
func (client *KeepTestClient) PutB(buf []byte) (string, int, error) {
client.Content = buf
return fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf)), len(buf), nil
}
func (s *TestSuite) TestLoadImage(c *C) {
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{},
+ &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
+
kc := &KeepTestClient{}
defer kc.Close()
- cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- c.Assert(err, IsNil)
+ cr.ContainerArvClient = &ArvTestClient{}
+ cr.ContainerKeepClient = kc
_, err = cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
c.Check(err, IsNil)
}
func (ArvErrorTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
+ if method == "GET" && resourceType == "containers" && action == "auth" {
+ return nil
+ }
return errors.New("ArvError")
}
return "", 0, errors.New("KeepError")
}
+func (*KeepErrorTestClient) LocalLocator(string) (string, error) {
+ return "", errors.New("KeepError")
+}
+
type KeepReadErrorTestClient struct {
KeepTestClient
}
// (1) Arvados error
kc := &KeepTestClient{}
defer kc.Close()
- cr, err := NewContainerRunner(s.client, ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
+
+ cr.ContainerArvClient = &ArvErrorTestClient{}
+ cr.ContainerKeepClient = &KeepTestClient{}
+
cr.Container.ContainerImage = hwPDH
err = cr.LoadImage()
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
- cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ kc := &KeepErrorTestClient{}
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
+
+ cr.ContainerArvClient = &ArvTestClient{}
+ cr.ContainerKeepClient = &KeepErrorTestClient{}
+
cr.Container.ContainerImage = hwPDH
err = cr.LoadImage()
func (s *TestSuite) TestLoadImageCollectionError(c *C) {
// (3) Collection doesn't contain image
- cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ kc := &KeepReadErrorTestClient{}
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
cr.Container.ContainerImage = otherPDH
+ cr.ContainerArvClient = &ArvTestClient{}
+ cr.ContainerKeepClient = &KeepReadErrorTestClient{}
+
err = cr.LoadImage()
c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar")
}
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
- cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ kc := &KeepReadErrorTestClient{}
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
cr.Container.ContainerImage = hwPDH
+ cr.ContainerArvClient = &ArvTestClient{}
+ cr.ContainerKeepClient = &KeepReadErrorTestClient{}
err = cr.LoadImage()
c.Check(err, NotNil)
cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
+ cr.ContainerArvClient = &ArvTestClient{}
+ cr.ContainerKeepClient = &KeepTestClient{}
+
var logs TestLogs
cr.NewLogWriter = logs.NewTestLoggingWriter
cr.Container.ContainerImage = hwPDH
defer kc.Close()
cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
+ s.runner = cr
cr.statInterval = 100 * time.Millisecond
+ cr.containerWatchdogInterval = time.Second
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
}
return d, err
}
- cr.MkArvClient = func(token string) (IArvadosClient, error) {
- return &ArvTestClient{secretMounts: secretMounts}, nil
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
+ return &ArvTestClient{secretMounts: secretMounts}, &KeepTestClient{}, nil, nil
}
if extraMounts != nil && len(extraMounts) > 0 {
}
if exitCode != 2 {
c.Check(api.WasSetRunning, Equals, true)
- c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+ var lastupdate arvadosclient.Dict
+ for _, content := range api.Content {
+ if content["container"] != nil {
+ lastupdate = content["container"].(arvadosclient.Dict)
+ }
+ }
+ if lastupdate["log"] == nil {
+ c.Errorf("no container update with non-nil log -- updates were: %v", api.Content)
+ }
}
if err != nil {
c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
}
+func (s *TestSuite) TestContainerWaitFails(c *C) {
+ api, _, _ := s.fullRunHelper(c, `{
+ "command": ["sleep", "3"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1
+}`, nil, 0, func(t *TestDockerClient) {
+ t.ctrExited = true
+ time.Sleep(10 * time.Second)
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*")
+}
+
func (s *TestSuite) TestCrunchstat(c *C) {
api, _, _ := s.fullRunHelper(c, `{
"command": ["sleep", "1"],
cr, err := NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
- cr.MkArvClient = func(token string) (IArvadosClient, error) {
- return &ArvTestClient{}, nil
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
+ return &ArvTestClient{}, &KeepTestClient{}, nil, nil
}
setup(cr)
c.Assert(err, IsNil)
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
+ cr.ContainerArvClient = &ArvTestClient{}
+ cr.ContainerKeepClient = &KeepTestClient{}
realTemp, err := ioutil.TempDir("", "crunchrun_test1-")
c.Assert(err, IsNil)
"runtime_constraints": {}
}`
- api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+ api, cr, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
c.Check(api.CalledWith("container.state", "Complete"), NotNil)
- c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
+ c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
}
// Used by the TestStdoutWithWrongPath*()
c.Assert(err, IsNil)
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
- cr.MkArvClient = func(token string) (IArvadosClient, error) {
- return &ArvTestClient{}, nil
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
+ return &ArvTestClient{}, &KeepTestClient{}, nil, nil
}
err = cr.Run()
extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
- api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, cr, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
c.Check(api.CalledWith("container.state", "Complete"), NotNil)
- c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
+ c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
}
func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
}
func (s *TestSuite) TestStderrMount(c *C) {
- api, _, _ := s.fullRunHelper(c, `{
+ api, cr, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo hello;exit 1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
- c.Check(api.CalledWith("collection.manifest_text", "./a b1946ac92492d2347c6235b4d2611184+6 0:6:out.txt\n./b 38af5c54926b620264ab1501150cf189+5 0:5:err.txt\n"), NotNil)
+ c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a b1946ac92492d2347c6235b4d2611184+6 0:6:out.txt\n./b 38af5c54926b620264ab1501150cf189+5 0:5:err.txt\n"), NotNil)
}
func (s *TestSuite) TestNumberRoundTrip(c *C) {
"runtime_constraints": {}
}`
- api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+ api, cr, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
content, err := ioutil.ReadFile(t.realTemp + "/tmp2/secret.conf")
c.Check(err, IsNil)
c.Check(content, DeepEquals, []byte("mypassword"))
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
c.Check(api.CalledWith("container.state", "Complete"), NotNil)
- c.Check(api.CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), NotNil)
- c.Check(api.CalledWith("collection.manifest_text", ""), IsNil)
+ c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), NotNil)
+ c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), IsNil)
// under secret mounts, not captured in output
helperRecord = `{
"runtime_constraints": {}
}`
- api, _, _ = s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+ api, cr, _ = s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
content, err := ioutil.ReadFile(t.realTemp + "/tmp2/secret.conf")
c.Check(err, IsNil)
c.Check(content, DeepEquals, []byte("mypassword"))
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
c.Check(api.CalledWith("container.state", "Complete"), NotNil)
- c.Check(api.CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), IsNil)
- c.Check(api.CalledWith("collection.manifest_text", ""), NotNil)
+ c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), IsNil)
+ c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), NotNil)
}
type FakeProcess struct {
func (fp FakeProcess) CmdlineSlice() ([]string, error) {
return fp.cmdLine, nil
}
-
-func (s *TestSuite) helpCheckContainerd(c *C, lp func() ([]PsProcess, error)) error {
- kc := &KeepTestClient{}
- defer kc.Close()
- cr, err := NewContainerRunner(s.client, &ArvTestClient{callraw: true}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- cr.checkContainerd = time.Duration(100 * time.Millisecond)
- c.Assert(err, IsNil)
- cr.ListProcesses = lp
-
- s.docker.fn = func(t *TestDockerClient) {
- time.Sleep(1 * time.Second)
- t.logWriter.Close()
- }
-
- err = cr.CreateContainer()
- c.Check(err, IsNil)
-
- err = cr.StartContainer()
- c.Check(err, IsNil)
-
- err = cr.WaitFinish()
- return err
-
-}
-
-func (s *TestSuite) TestCheckContainerdPresent(c *C) {
- err := s.helpCheckContainerd(c, func() ([]PsProcess, error) {
- return []PsProcess{FakeProcess{[]string{"docker-containerd"}}}, nil
- })
- c.Check(err, IsNil)
-}
-
-func (s *TestSuite) TestCheckContainerdMissing(c *C) {
- err := s.helpCheckContainerd(c, func() ([]PsProcess, error) {
- return []PsProcess{FakeProcess{[]string{"abc"}}}, nil
- })
- c.Check(err, ErrorMatches, `'containerd' not found in process list.`)
-}
ExecStart=/usr/bin/keep-balance -commit-pulls -commit-trash
Restart=always
RestartSec=10s
+Nice=19
# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
StartLimitInterval=0
cmd: "move testfile newdir0/\n",
match: `(?ms).*Moving .* failed.*`,
},
+ {
+ path: writePath,
+ cmd: "lock newdir0/testfile\n",
+ match: `(?ms).*Locking .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "unlock newdir0/testfile\nasdf\n",
+ match: `(?ms).*Unlocking .* succeeded.*`,
+ },
{
path: writePath,
cmd: "ls\n",
cmd: "mkcol newdir1\n",
match: `(?ms).*Creating .* succeeded.*`,
},
+ {
+ path: writePath,
+ cmd: "move newdir1/ newdir1x/\n",
+ match: `(?ms).*Moving .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "move newdir1x newdir1\n",
+ match: `(?ms).*Moving .* succeeded.*`,
+ },
{
path: writePath,
cmd: "move newdir0/testfile newdir1/\n",
cmd: "delete foo\n",
match: `(?ms).*Deleting .* failed:.*405 Method Not Allowed.*`,
},
+ {
+ path: pdhPath,
+ cmd: "lock foo\n",
+ match: `(?ms).*Locking .* failed:.*405 Method Not Allowed.*`,
+ },
} {
c.Logf("%s %+v", "http://"+s.testServer.Addr, trial)
if skip != nil && skip(trial.path) {
writeMethod = map[string]bool{
"COPY": true,
"DELETE": true,
+ "LOCK": true,
"MKCOL": true,
"MOVE": true,
"PUT": true,
"RMCOL": true,
+ "UNLOCK": true,
}
webdavMethod = map[string]bool{
"COPY": true,
"DELETE": true,
+ "LOCK": true,
"MKCOL": true,
"MOVE": true,
"OPTIONS": true,
"PROPFIND": true,
"PUT": true,
"RMCOL": true,
+ "UNLOCK": true,
}
browserMethod = map[string]bool{
"GET": true,
if !fs.writing {
return errReadOnly
}
+ if strings.HasSuffix(oldName, "/") {
+ // WebDAV "MOVE foo/ bar/" means rename foo to bar.
+ oldName = oldName[:len(oldName)-1]
+ newName = strings.TrimSuffix(newName, "/")
+ }
fs.makeparents(newName)
return fs.collfs.Rename(oldName, newName)
}
remoteID := part[1:6]
remote, ok := cluster.RemoteClusters[remoteID]
if !ok {
- http.Error(w, "remote cluster not configured", http.StatusBadGateway)
+ http.Error(w, "remote cluster not configured", http.StatusBadRequest)
return
}
kc, err := rp.remoteClient(remoteID, remote, token)
import (
"bytes"
"context"
+ "crypto/sha256"
"encoding/base64"
"encoding/hex"
"flag"
return err
}
opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
+ // In AWS regions that use V4 signatures, we need to
+ // provide ContentSHA256 up front. Otherwise, the S3
+ // library reads the request body (from our buffer)
+ // into another new buffer in order to compute the
+ // SHA256 before sending the request -- which would
+ // mean consuming 128 MiB of memory for the duration
+ // of a 64 MiB write.
+ opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
}
// Send the block data through a pipe, so that (if we need to)
Bucket: v.bucket.Bucket,
Prefix: prefix,
PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
}
recentL := s3Lister{
Bucket: v.bucket.Bucket,
Prefix: "recent/" + prefix,
PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
}
- v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
- v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
- for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
- v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+ for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
if data.Key >= "g" {
// Conveniently, "recent/*" and "trash/*" are
// lexically greater than all hex-encoded data
stamp := data
// Advance to the corresponding recent/X marker, if any
- for recent != nil {
+ for recent != nil && recentL.Error() == nil {
if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
recent = recentL.Next()
- v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
continue
} else if cmp == 0 {
stamp = recent
recent = recentL.Next()
- v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
break
} else {
// recent/X marker is missing: we'll
break
}
}
+ if err := recentL.Error(); err != nil {
+ return err
+ }
t, err := time.Parse(time.RFC3339, stamp.LastModified)
if err != nil {
return err
}
fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
}
- return nil
+ return dataL.Error()
}
// Trash a Keep block.
MetadataDirective: "REPLACE",
}, v.bucket.Name+"/"+src)
err = v.translateError(err)
- if err != nil {
+ if os.IsNotExist(err) {
return err
+ } else if err != nil {
+ return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
}
if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
Bucket: v.bucket.Bucket,
Prefix: "trash/",
PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
}
for trash := trashL.First(); trash != nil; trash = trashL.Next() {
todo <- trash
Bucket *s3.Bucket
Prefix string
PageSize int
+ Stats *s3bucketStats
nextMarker string
buf []s3.Key
err error
}
func (lister *s3Lister) getPage() {
+ lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
lister.nextMarker = ""
if err != nil {
for _, volume := range volumes {
mtime, err := volume.Mtime(trashRequest.Locator)
if err != nil {
- log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
+ log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
continue
}
if trashRequest.BlockMtime != mtime.UnixNano() {
- log.Printf("%v Delete(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
+ log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
continue
}
if !theConfig.EnableDelete {
- err = errors.New("did not delete block because EnableDelete is false")
+ err = errors.New("skipping because EnableDelete is false")
} else {
err = volume.Trash(trashRequest.Locator)
}
if err != nil {
- log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
+ log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
} else {
- log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
+ log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)
}
}
}
fi
}
+getclusterid() {
+ docker exec $ARVBOX_CONTAINER cat /var/lib/arvados/api_uuid_prefix
+}
+
updateconf() {
if test -f ~/.config/arvados/$ARVBOX_CONTAINER.conf ; then
sed "s/ARVADOS_API_HOST=.*/ARVADOS_API_HOST=$(gethost):8000/" <$HOME/.config/arvados/$ARVBOX_CONTAINER.conf >$HOME/.config/arvados/$ARVBOX_CONTAINER.conf.tmp
;;
status)
- echo "Selected: $ARVBOX_CONTAINER"
+ echo "Container: $ARVBOX_CONTAINER"
if docker ps -a --filter "status=running" | grep -E "$ARVBOX_CONTAINER$" -q ; then
+ echo "Cluster id: $(getclusterid)"
echo "Status: running"
echo "Container IP: $(getip)"
echo "Published host: $(gethost)"
sv)
if test -n "$1" -a -n "$2" ; then
- exec docker exec -ti $ARVBOX_CONTAINER sv "$@"
+ exec docker exec $ARVBOX_CONTAINER sv "$@"
else
echo "Usage: $0 $subcmd <start|stop|restart> <service>"
echo "Available services:"
- exec docker exec -ti $ARVBOX_CONTAINER ls /etc/service
+ exec docker execa $ARVBOX_CONTAINER ls /etc/service
fi
;;
"ignore": "test",
"package": [
{
- "checksumSHA1": "jf7K+UTQNIzRdlG5F4zX/8b++/E=",
+ "checksumSHA1": "j4je0EzPGzjb6INLY1BHZ+hyMjc=",
"origin": "github.com/curoverse/goamz/aws",
"path": "github.com/AdRoll/goamz/aws",
- "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+ "revision": "888b4804f2653cd35ebcc95f046079e63b5b2799",
"revisionTime": "2017-07-27T13:52:37Z"
},
{
- "checksumSHA1": "9nUwQXI+pNxZo6bnR7NslpMpfPI=",
+ "checksumSHA1": "0+n3cT6e7sQCCbBAH8zg6neiHTk=",
"origin": "github.com/curoverse/goamz/s3",
"path": "github.com/AdRoll/goamz/s3",
- "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+ "revision": "888b4804f2653cd35ebcc95f046079e63b5b2799",
"revisionTime": "2017-07-27T13:52:37Z"
},
{
"checksumSHA1": "tvxbsTkdjB0C/uxEglqD6JfVnMg=",
"origin": "github.com/curoverse/goamz/s3/s3test",
"path": "github.com/AdRoll/goamz/s3/s3test",
- "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+ "revision": "888b4804f2653cd35ebcc95f046079e63b5b2799",
"revisionTime": "2017-07-27T13:52:37Z"
},
{
"revision": "d682213848ed68c0a260ca37d6dd5ace8423f5ba",
"revisionTime": "2017-12-05T20:32:29Z"
},
- {
- "checksumSHA1": "st4vb0GmDeoKbsfxdpNZ2MPl76M=",
- "path": "github.com/StackExchange/wmi",
- "revision": "cdffdb33acae0e14efff2628f9bae377b597840e",
- "revisionTime": "2018-04-12T20:51:11Z"
- },
{
"checksumSHA1": "spyv5/YFBjYyZLZa1U2LBfDR8PM=",
"path": "github.com/beorn7/perks/quantile",
"revision": "0ca9ea5df5451ffdf184b4428c902747c2c11cd7",
"revisionTime": "2017-03-27T23:54:44Z"
},
- {
- "checksumSHA1": "Kqv7bA4oJG0nPwQvGWDwGGaKONo=",
- "path": "github.com/go-ole/go-ole",
- "revision": "7a0fa49edf48165190530c675167e2f319a05268",
- "revisionTime": "2018-06-25T08:58:08Z"
- },
- {
- "checksumSHA1": "PArleDBtadu2qO4hJwHR8a3IOTA=",
- "path": "github.com/go-ole/go-ole/oleutil",
- "revision": "7a0fa49edf48165190530c675167e2f319a05268",
- "revisionTime": "2018-06-25T08:58:08Z"
- },
{
"checksumSHA1": "8UEp6v0Dczw/SlasE0DivB0mAHA=",
"path": "github.com/gogo/protobuf/jsonpb",
"revision": "d14ea06fba99483203c19d92cfcd13ebe73135f4",
"revisionTime": "2015-07-11T00:45:18Z"
},
+ {
+ "checksumSHA1": "khL6oKjx81rAZKW+36050b7f5As=",
+ "path": "github.com/jmcvetta/randutil",
+ "revision": "2bb1b664bcff821e02b2a0644cd29c7e824d54f8",
+ "revisionTime": "2015-08-17T12:26:01Z"
+ },
{
"checksumSHA1": "oX6jFQD74oOApvDIhOzW2dXpg5Q=",
"path": "github.com/kevinburke/ssh_config",
"revision": "1744e2970ca51c86172c8190fadad617561ed6e7",
"revisionTime": "2017-11-10T11:01:46Z"
},
- {
- "checksumSHA1": "q14d3C3xvWevU3dSv4P5K0+OSD0=",
- "path": "github.com/shirou/gopsutil/cpu",
- "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
- "revisionTime": "2018-07-05T13:28:12Z"
- },
- {
- "checksumSHA1": "LZ9GloiGLTISmQ4dalK2XspH6Wo=",
- "path": "github.com/shirou/gopsutil/host",
- "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
- "revisionTime": "2018-07-05T13:28:12Z"
- },
- {
- "checksumSHA1": "cyoqI0gryzjxGTkaAfyUqMiuUR0=",
- "path": "github.com/shirou/gopsutil/internal/common",
- "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
- "revisionTime": "2018-07-05T13:28:12Z"
- },
- {
- "checksumSHA1": "vEQLjAO5T5K9zXblEMYdoaBZzj0=",
- "path": "github.com/shirou/gopsutil/mem",
- "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
- "revisionTime": "2018-07-05T13:28:12Z"
- },
- {
- "checksumSHA1": "KMWFRa0DVpabo9d8euB4RYjUBQE=",
- "path": "github.com/shirou/gopsutil/net",
- "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
- "revisionTime": "2018-07-05T13:28:12Z"
- },
- {
- "checksumSHA1": "fbO7c1gv1kSvWKOb/+5HUWFkBaA=",
- "path": "github.com/shirou/gopsutil/process",
- "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
- "revisionTime": "2018-07-05T13:28:12Z"
- },
- {
- "checksumSHA1": "Nve7SpDmjsv6+rhkXAkfg/UQx94=",
- "path": "github.com/shirou/w32",
- "revision": "bb4de0191aa41b5507caa14b0650cdbddcd9280b",
- "revisionTime": "2016-09-30T03:27:40Z"
- },
{
"checksumSHA1": "8QeSG127zQqbA+YfkO1WkKx/iUI=",
"path": "github.com/src-d/gcfg",