15348: Merge branch 'master'
authorTom Clegg <tom@tomclegg.ca>
Wed, 1 Jul 2020 19:01:38 +0000 (15:01 -0400)
committerTom Clegg <tom@tomclegg.ca>
Wed, 1 Jul 2020 19:01:38 +0000 (15:01 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

55 files changed:
apps/workbench/Gemfile.lock
build/package-testing/test-package-python3-arvados-python-client.sh [new file with mode: 0755]
build/package-testing/test-package-python3-python-arvados-fuse.sh [new symlink]
build/package-testing/test-package-rh-python36-python-arvados-python-client.sh [new file with mode: 0755]
build/run-build-packages.sh
build/run-library.sh
doc/_config.yml
doc/admin/link-accounts.html.textile.liquid [new file with mode: 0644]
doc/api/methods/users.html.textile.liquid
doc/install/install-shell-server.html.textile.liquid
doc/install/install-webshell.html.textile.liquid [new file with mode: 0644]
doc/user/reference/api-tokens.html.textile.liquid
docker/jobs/Dockerfile
docker/jobs/apt.arvados.org-dev.list
docker/jobs/apt.arvados.org-stable.list
docker/jobs/apt.arvados.org-testing.list
lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/controller/federation.go
lib/controller/federation/federation_test.go
lib/controller/federation_test.go
lib/controller/handler.go
lib/controller/handler_test.go
lib/controller/localdb/db.go [new file with mode: 0644]
lib/controller/localdb/db_test.go [new file with mode: 0644]
lib/controller/localdb/docker_test.go [new file with mode: 0644]
lib/controller/localdb/login.go
lib/controller/localdb/login_ldap_docker_test.go
lib/controller/localdb/login_ldap_docker_test.sh
lib/controller/localdb/login_ldap_test.go
lib/controller/localdb/login_oidc_test.go
lib/controller/proxy.go
lib/controller/router/router.go
lib/controller/router/router_test.go
lib/recovercollection/cmd.go
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/arvados-tests.yml
sdk/cwl/tests/wf-defaults/default-dir4.cwl
sdk/cwl/tests/wf-defaults/default-dir8.cwl [new file with mode: 0644]
sdk/cwl/tests/wf-defaults/wf4.cwl
sdk/cwl/tests/wf-defaults/wf8.cwl [new file with mode: 0644]
sdk/go/arvados/client.go
sdk/go/arvados/config.go
sdk/go/arvados/keep_service.go
sdk/go/arvados/keep_service_test.go
services/api/Gemfile.lock
services/api/lib/fix_roles_projects.rb
services/api/test/fixtures/links.yml
services/api/test/unit/group_test.rb
services/fuse/fpm-info.sh
services/keep-balance/balance.go
services/keep-balance/collection.go
services/keep-balance/collection_test.go
services/keep-balance/keep_service.go

index 2420fee24d07e056d3ee4b7047f43f87dd1b5d6d..cb4e7ab9e334cb8fdb0ae72c20ee841f4fed02b2 100644 (file)
@@ -214,7 +214,7 @@ GEM
       multi_json (~> 1.0)
       websocket-driver (>= 0.2.0)
     public_suffix (4.0.3)
-    rack (2.2.2)
+    rack (2.2.3)
     rack-mini-profiler (1.0.2)
       rack (>= 1.2.0)
     rack-test (0.6.3)
diff --git a/build/package-testing/test-package-python3-arvados-python-client.sh b/build/package-testing/test-package-python3-arvados-python-client.sh
new file mode 100755 (executable)
index 0000000..d4e66a2
--- /dev/null
@@ -0,0 +1,13 @@
+#!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+set -e
+
+arv-put --version
+
+/usr/share/python3/dist/python3-arvados-python-client/bin/python3 << EOF
+import arvados
+print("Successfully imported arvados")
+EOF
diff --git a/build/package-testing/test-package-python3-python-arvados-fuse.sh b/build/package-testing/test-package-python3-python-arvados-fuse.sh
new file mode 120000 (symlink)
index 0000000..3b9232c
--- /dev/null
@@ -0,0 +1 @@
+test-package-python27-python-arvados-fuse.sh
\ No newline at end of file
diff --git a/build/package-testing/test-package-rh-python36-python-arvados-python-client.sh b/build/package-testing/test-package-rh-python36-python-arvados-python-client.sh
new file mode 100755 (executable)
index 0000000..1a69256
--- /dev/null
@@ -0,0 +1,13 @@
+#!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+set -e
+
+arv-put --version
+
+/usr/share/python3/dist/rh-python36-python-arvados-python-client/bin/python3 << EOF
+import arvados
+print("Successfully imported arvados")
+EOF
index 1f855a77394294fe343f1200bf79d517662cb3b9..2d1d0bd19e041017edc473ea6556c81f9f0ed9ab 100755 (executable)
@@ -336,6 +336,9 @@ fpm_build_virtualenv "libpam-arvados" "sdk/pam"
 # The FUSE driver
 fpm_build_virtualenv "arvados-fuse" "services/fuse"
 
+# The FUSE driver - Python3 package
+fpm_build_virtualenv "arvados-fuse" "services/fuse" "python3"
+
 # The node manager
 fpm_build_virtualenv "arvados-node-manager" "services/nodemanager"
 
index e14458ad99f41d0ff4b2ec3014c79b236e28089c..1971a33d0590fdc6e71ba96101adf697db0eb1c0 100755 (executable)
@@ -601,7 +601,7 @@ fpm_build_virtualenv () {
   cd build/usr/share/$python/dist/$PYTHON_PKG/
 
   # Replace the shebang lines in all python scripts, and handle the activate
-  # scripts too This is a functional replacement of the 237 line
+  # scripts too. This is a functional replacement of the 237 line
   # virtualenv_tools.py script that doesn't work in python3 without serious
   # patching, minus the parts we don't need (modifying pyc files, etc).
   for binfile in `ls bin/`; do
@@ -659,7 +659,7 @@ fpm_build_virtualenv () {
     COMMAND_ARR+=('--rpm-auto-add-directories')
   fi
 
-  if [[ "$PKG" == "arvados-python-client" ]]; then
+  if [[ "$PKG" == "arvados-python-client" ]] || [[ "$PKG" == "arvados-fuse" ]]; then
     if [[ "$python" == "python2.7" ]]; then
       COMMAND_ARR+=('--conflicts' "$PYTHON3_PKG_PREFIX-$PKG")
     else
index 3b59cbca45205983ba4b83429f06b914946a53dd..be52a204c02d4e9548eeaa1139ff8126cff4f400 100644 (file)
@@ -153,8 +153,9 @@ navbar:
       - admin/index.html.textile.liquid
     - Users and Groups:
       - admin/user-management.html.textile.liquid
-      - admin/reassign-ownership.html.textile.liquid
       - admin/user-management-cli.html.textile.liquid
+      - admin/reassign-ownership.html.textile.liquid
+      - admin/link-accounts.html.textile.liquid
       - admin/group-management.html.textile.liquid
       - admin/federation.html.textile.liquid
       - admin/merge-remote-account.html.textile.liquid
@@ -217,6 +218,7 @@ navbar:
       - install/install-ws.html.textile.liquid
       - install/install-arv-git-httpd.html.textile.liquid
       - install/install-shell-server.html.textile.liquid
+      - install/install-webshell.html.textile.liquid
     - Containers API:
       - install/crunch2-slurm/install-compute-node.html.textile.liquid
       - install/install-jobs-image.html.textile.liquid
diff --git a/doc/admin/link-accounts.html.textile.liquid b/doc/admin/link-accounts.html.textile.liquid
new file mode 100644 (file)
index 0000000..d0ac6a0
--- /dev/null
@@ -0,0 +1,48 @@
+---
+layout: default
+navsection: admin
+title: "Link user accounts"
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+If a user needs to log in to Arvados with a upstream account or provider, they may end up with two Arvados user accounts.  If the user still has the ability to log in with the old account, they can use the "self-serve account linking":{{site.baseurl}}/user/topics/link-accounts.html feature of workbench.  However, if the user does not have the ability to log in with both upstream accounts, the admin can also link the accounts using the command line.
+
+h3. Step 1: Determine user uuids
+
+User uuids can be determined by browsing workbench or using @arv user list@ at the command line.
+
+Account linking works by recording in the database that a log in to the "old" account should redirected and treated as a login to the "new" account.
+
+The "old" account is the Arvados account that will be redirected.
+
+The "new" account is the user that the "old" account is redirected to.  As part of account linking any Arvados records owned by the "old" account is also transferred to the "new" account.
+
+Counter-intuitively, if you do not want the account uuid of the user to change, the "new" account should be the pre-existing account, and the "old" account should be the redundant second account that was more recently created.  This means "old" and "new" are opposite from their expected chronological meaning.  In this case, the use of "old" and "new" reflect the direction of transfer of ownership -- the login was associated with the "old" user account, but will be associated with the "new" user account.
+
+In the example below, @zzzzz-tpzed-3kz0nwtjehhl0u4@ is the "old" account (the pre-existing account we want to keep) and @zzzzz-tpzed-fr97h9t4m5jffxs@ is the "new" account (the redundant account we want to merge into the existing account).
+
+h3. Step 2: Create a project
+
+Create a project owned by the "new" account that will hold any data owned by the "old" account.
+
+<pre>
+$ arv --format=uuid group create --group '{"group_class": "project", "name": "Data from old user", "owner_uuid": "zzzzz-tpzed-fr97h9t4m5jffxs"}'
+zzzzz-j7d0g-mczqiguhil13083
+</pre>
+
+h3. Step 3: Merge "old" user to "new" user
+
+The @user merge@ method redirects login and reassigns data from the "old" account to the "new" account.
+
+<pre>
+$ arv user merge  --redirect-to-new-user \
+  --old-user-uuid=zzzzz-tpzed-3kz0nwtjehhl0u4 \
+  --new-user-uuid=zzzzz-tpzed-fr97h9t4m5jffxs \
+  --new-owner-uuid=zzzzz-j7d0g-mczqiguhil13083 \
+</pre>
+
+Note that authorization credentials (API tokens, ssh keys) are also transferred to the "new" account, so credentials used to access the "old" account work with the "new" account.
index 4c33f2afe820df5e662622b5880a9fd75f3561a6..cde189d6ffa341833cadd7cd08be32fd79146a7c 100644 (file)
@@ -154,3 +154,21 @@ Arguments:
 table(table table-bordered table-condensed).
 |_. Argument |_. Type |_. Description |_. Location |_. Example |
 {background:#ccffcc}.|uuid|string|The UUID of the User in question.|path||
+
+h3. merge
+
+Transfer ownership of data from the "old" user account to the "new" user account.  When @redirect_to_new_user@ is @true@ this also causes logins to the "old" account to be redirected to the "new" account.  The "old" user account that was redirected becomes invisible in user listings.
+
+See "Merge user accounts":{{site.baseurl}}/admin/link-accounts.html , "Reassign user data ownership":{{site.baseurl}}/admin/reassign-ownership.html and "Linking alternate login accounts":{{site.baseurl}}/user/topics/link-accounts.html for examples of how this method is used.
+
+Must supply either @new_user_token@ (the currently authorized user will be the "old" user), or both @new_user_uuid@ and @old_user_uuid@ (the currently authorized user must be an admin).
+
+Arguments:
+
+table(table table-bordered table-condensed).
+|_. Argument |_. Type |_. Description |_. Location |_. Example |
+|new_user_token|string|A valid token for the "new" user|query||
+|new_user_uuid|uuid|The uuid of the "new" account|query||
+|old_user_uuid|uuid|The uuid of the "old" account|query||
+|new_owner_uuid|uuid|The uuid of a project to which objects owned by the "old" user will be reassigned.|query||
+|redirect_to_new_user|boolean|If true, also redirect login and reassign authorization credentials from "old" user to the "new" user|query||
index 44b3834ab84ec8df76d4810c1ee76dbaf7fa0845..5ac5e9e6b870a2753287b2b8a59e50c6686d80df 100644 (file)
@@ -69,7 +69,7 @@ As an Arvados admin user (such as the system root user), create a "scoped token"
 
 <notextile>
 <pre>
-<code>apiserver:~$ <span class="userinput">arv api_client_authorization create --api-client-authorization '{"scopes":["GET /arvados/v1/virtual_machines/<b>zzzzz-2x53u-zzzzzzzzzzzzzzz</b>/logins"]}'
+<code>apiserver:~$ <span class="userinput">arv api_client_authorization create --api-client-authorization '{"scopes":["GET /arvados/v1/virtual_machines/<b>zzzzz-2x53u-zzzzzzzzzzzzzzz</b>/logins"]}'</span>
 {
  ...
  "api_token":"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
diff --git a/doc/install/install-webshell.html.textile.liquid b/doc/install/install-webshell.html.textile.liquid
new file mode 100644 (file)
index 0000000..4040fcf
--- /dev/null
@@ -0,0 +1,184 @@
+---
+layout: default
+navsection: installguide
+title: Configure webshell
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+# "Introduction":#introduction
+# "Prerequisites":#prerequisites
+# "Update config.yml":#configure
+# "Update nginx configuration":#update-nginx
+# "Install packages":#install-packages
+# "Configure shellinabox":#config-shellinabox
+# "Configure pam":#config-pam
+# "Confirm working installation":#confirm-working
+
+h2(#introduction). Introduction
+
+Arvados supports @webshell@, which allows ssh access to shell nodes via the browser. This functionality is integrated in @Workbench@.
+
+@Webshell@ is provided by the @shellinabox@ package which runs on each shell node for which webshell is enabled. For authentication, a supported @pam library@ that allows authentication against Arvados is also required. One Nginx (or similar web server) virtualhost is also needed to expose all the @shellinabox@ instances via https.
+
+h2(#prerequisites). Prerequisites
+
+# "Install workbench":{{site.baseurl}}/install/install-workbench-app.html
+# "Set up a shell node":{{site.baseurl}}/install/install-shell-server.html
+
+h2(#configure). Update config.yml
+
+Edit the cluster config at @config.yml@ and set @Services.WebShell.ExternalURL@.  Replace @zzzzz@ with your cluster id. Workbench will use this information to activate its support for webshell.
+
+<notextile>
+<pre><code>    Services:
+      WebShell:
+        InternalURLs: {}
+        ExternalURL: <span class="userinput">https://webshell.ClusterID.example.com/</span>
+</span></code></pre>
+</notextile>
+
+h2(#update-nginx). Update Nginx configuration
+
+The arvados-webshell service will be accessible from anywhere on the internet, so we recommend using SSL for transport encryption. This Nginx virtualhost could live on your Workbench server, or any other server that is reachable by your Workbench users and can access the @shell-in-a-box@ service on the shell node(s) on port 4200.
+
+Use a text editor to create a new file @/etc/nginx/conf.d/arvados-webshell.conf@ with the following configuration.  Options that need attention are marked in <span class="userinput">red</span>.
+
+<notextile><pre>
+upstream arvados-webshell {
+  server                <span class="userinput">shell.ClusterID.example.com</span>:<span class="userinput">4200</span>;
+}
+
+server {
+  listen                443 ssl;
+  server_name           webshell.<span class="userinput">ClusterID.example.com</span>;
+
+  proxy_connect_timeout 90s;
+  proxy_read_timeout    300s;
+
+  ssl                   on;
+  ssl_certificate       <span class="userinput">/YOUR/PATH/TO/cert.pem</span>;
+  ssl_certificate_key   <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
+
+  location /<span class="userinput">shell.ClusterID</span> {
+    if ($request_method = 'OPTIONS') {
+       add_header 'Access-Control-Allow-Origin' '*'; 
+       add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
+       add_header 'Access-Control-Allow-Headers' 'DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type';
+       add_header 'Access-Control-Max-Age' 1728000;
+       add_header 'Content-Type' 'text/plain charset=UTF-8';
+       add_header 'Content-Length' 0;
+       return 204;
+    }
+    if ($request_method = 'POST') {
+       add_header 'Access-Control-Allow-Origin' '*';
+       add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
+       add_header 'Access-Control-Allow-Headers' 'DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type';
+    }
+    if ($request_method = 'GET') {
+       add_header 'Access-Control-Allow-Origin' '*';
+       add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
+       add_header 'Access-Control-Allow-Headers' 'DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type';
+    }
+
+    proxy_ssl_session_reuse off;
+    proxy_read_timeout  90;
+    proxy_set_header    X-Forwarded-Proto https;
+    proxy_set_header    Host $http_host;
+    proxy_set_header    X-Real-IP $remote_addr;
+    proxy_set_header    X-Forwarded-For $proxy_add_x_forwarded_for;
+    proxy_pass          http://arvados-webshell;
+  }
+}
+</pre></notextile>
+
+Note that the location line in the nginx config matches your shell node hostname *without domain*, because that is how the shell node was defined in the "Set up a shell node":{{site.baseurl}}/install/install-shell-server.html#vm-record instructions. It makes for a more user friendly experience in Workbench.
+
+For additional shell nodes with @shell-in-a-box@, add @location@ and @upstream@ sections as needed.
+
+{% assign arvados_component = 'shellinabox libpam-arvados' %}
+
+{% include 'install_packages' %}
+
+h2(#config-shellinabox). Configure shellinabox
+
+h3. Red Hat and Centos
+
+Edit @/etc/sysconfig/shellinaboxd@:
+
+<notextile><pre>
+# TCP port that shellinboxd's webserver listens on
+PORT=4200
+
+# SSL is disabled because it is terminated in Nginx. Adjust as needed.
+OPTS="--disable-ssl --no-beep --service=/<span class="userinput">shell.ClusterID.example.com</span>:AUTH:HOME:SHELL"
+</pre></notextile>
+
+<notextile>
+<pre>
+<code># <span class="userinput">systemctl enable shellinabox</span></code>
+<code># <span class="userinput">systemctl start shellinabox</span></code>
+</pre>
+</notextile>
+
+h3. Debian and Ubuntu
+
+Edit @/etc/default/shellinabox@:
+
+<notextile><pre>
+# TCP port that shellinboxd's webserver listens on
+SHELLINABOX_PORT=4200
+
+# SSL is disabled because it is terminated in Nginx. Adjust as needed.
+SHELLINABOX_ARGS="--disable-ssl --no-beep --service=/<span class="userinput">shell.ClusterID.example.com</span>:AUTH:HOME:SHELL"
+</pre></notextile>
+
+<notextile>
+<pre>
+<code># <span class="userinput">systemctl enable shellinabox</span></code>
+<code># <span class="userinput">systemctl start shellinabox</span></code>
+</pre>
+</notextile>
+
+
+h2(#config-pam). Configure pam
+
+Use a text editor to create a new file @/etc/pam.d/shellinabox@ with the following configuration. Options that need attention are marked in <span class="userinput">red</span>.
+
+<notextile><pre>
+# This example is a stock debian "login" file with libpam_arvados
+# replacing pam_unix, and the "noprompt" option in use. It can be
+# installed as /etc/pam.d/shellinabox .
+
+auth       optional   pam_faildelay.so  delay=3000000
+auth [success=ok new_authtok_reqd=ok ignore=ignore user_unknown=bad default=die] pam_securetty.so
+auth       requisite  pam_nologin.so
+session [success=ok ignore=ignore module_unknown=ignore default=bad] pam_selinux.so close
+session       required   pam_env.so readenv=1
+session       required   pam_env.so readenv=1 envfile=/etc/default/locale
+
+auth [success=1 default=ignore] pam_python.so /usr/lib/security/libpam_arvados.py <span class="userinput">ClusterID.example.com</span> <span class="userinput">shell.ClusterID.example.com</span> noprompt
+auth    requisite            pam_deny.so
+auth    required            pam_permit.so
+
+auth       optional   pam_group.so
+session    required   pam_limits.so
+session    optional   pam_lastlog.so
+session    optional   pam_motd.so  motd=/run/motd.dynamic
+session    optional   pam_motd.so
+session    optional   pam_mail.so standard
+
+@include common-account
+@include common-session
+@include common-password
+
+session [success=ok ignore=ignore module_unknown=ignore default=bad] pam_selinux.so open
+</pre></notextile>
+
+h2(#confirm-working). Confirm working installation
+
+A user should be able to log in to the shell server, using webshell via workbench. Please refer to "Accessing an Arvados VM with Webshell":{{site.baseurl}}/user/getting_started/vm-login-with-webshell.html
+
index d5172f0c5b79b68dccfd208be5918cb0f4b56cbe..6afc20bf4fd9071b7fa67cf9849960ea997bcb53 100644 (file)
@@ -9,7 +9,7 @@ Copyright (C) The Arvados Authors. All rights reserved.
 SPDX-License-Identifier: CC-BY-SA-3.0
 {% endcomment %}
 
-The Arvados API token is a secret key that enables the @arv@ command line client to access Arvados with the proper permissions.
+The Arvados API token is a secret key that enables the Arvados command line tools to authenticate themselves.
 
 Access the Arvados Workbench using this link: "{{site.arvados_workbench_host}}/":{{site.arvados_workbench_host}}/  (Replace the hostname portion with the hostname of your local Arvados instance if necessary.)
 
index 876ac4f9f49cea14c42f54f1ebe37423b4251cd2..15993c4bc322619e125ddb5411a79a2d0f4348f0 100644 (file)
@@ -3,8 +3,8 @@
 # SPDX-License-Identifier: Apache-2.0
 
 # Based on Debian Stretch
-FROM debian:stretch-slim
-MAINTAINER Peter Amstutz <peter.amstutz@curii.com>
+FROM debian:buster-slim
+MAINTAINER Arvados Package Maintainers <packaging@arvados.org>
 
 ENV DEBIAN_FRONTEND noninteractive
 
index 468000ed29b9244460e28f0b5abe5f5efd13f133..4de87397bca754a57e384c3155d88b82a30983fc 100644 (file)
@@ -1,2 +1,2 @@
 # apt.arvados.org
-deb http://apt.arvados.org/ stretch-dev main
+deb http://apt.arvados.org/ buster-dev main
index afbc51effe84979f49f5d1c9584bf951c2408922..7882afd01c96235b1fde32767d56a68aeada8d03 100644 (file)
@@ -1,2 +1,2 @@
 # apt.arvados.org
-deb http://apt.arvados.org/ stretch main
+deb http://apt.arvados.org/ buster main
index c8ea91d070a572365006e849015d48006d060a22..3bb599087eaf513bb5c3f6dc2e32d54108d3db53 100644 (file)
@@ -1,2 +1,2 @@
 # apt.arvados.org
-deb http://apt.arvados.org/ stretch-testing main
+deb http://apt.arvados.org/ buster-testing main
index b9bc9c2c5ce4a28eb25015961b687cea449d503a..907acdc87847f9c052aee71c5e1d1fbe8c4f78aa 100644 (file)
@@ -440,6 +440,13 @@ Clusters:
       # or omitted, pages are processed serially.
       BalanceCollectionBuffers: 1000
 
+      # Maximum time for a rebalancing run. This ensures keep-balance
+      # eventually gives up and retries if, for example, a network
+      # error causes a hung connection that is never closed by the
+      # OS. It should be long enough that it doesn't interrupt a
+      # long-running balancing operation.
+      BalanceTimeout: 6h
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index 0ad4222f551ba1220d2459f4330e9a1e05240d44..d6b02b750de122582e35a5aa34b508861106ac40 100644 (file)
@@ -102,6 +102,7 @@ var whitelist = map[string]bool{
        "Collections.WebDAVCache":                      false,
        "Collections.BalanceCollectionBatch":           false,
        "Collections.BalancePeriod":                    false,
+       "Collections.BalanceTimeout":                   false,
        "Collections.BlobMissingReport":                false,
        "Collections.BalanceCollectionBuffers":         false,
        "Containers":                                   true,
index 758dc2677cf233b0d4d61462e7ec73d607f69174..96da19dfcdc14c6e20f0d1ea348c2423f909b1ba 100644 (file)
@@ -446,6 +446,13 @@ Clusters:
       # or omitted, pages are processed serially.
       BalanceCollectionBuffers: 1000
 
+      # Maximum time for a rebalancing run. This ensures keep-balance
+      # eventually gives up and retries if, for example, a network
+      # error causes a hung connection that is never closed by the
+      # OS. It should be long enough that it doesn't interrupt a
+      # long-running balancing operation.
+      BalanceTimeout: 6h
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index ac239fb9b23f5c4c106034e2a910fb441b9c2218..aceaba8087ad2031413516c2671f75174c457fae 100644 (file)
@@ -152,7 +152,7 @@ type CurrentUser struct {
 // non-nil, true, nil -- if the token is valid
 func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, bool, error) {
        user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
-       db, err := h.db(req)
+       db, err := h.db(req.Context())
        if err != nil {
                ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): database error", token)
                return nil, false, err
@@ -189,7 +189,7 @@ func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUse
 }
 
 func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
-       db, err := h.db(req)
+       db, err := h.db(req.Context())
        if err != nil {
                return nil, err
        }
index f57d827848cbf772e4e7b9c3c2b8ac1e860f18e2..256afc8e6b9482d53eaa520927f62761a1f71b03 100644 (file)
@@ -64,7 +64,7 @@ func (s *FederationSuite) addDirectRemote(c *check.C, id string, backend backend
 
 func (s *FederationSuite) addHTTPRemote(c *check.C, id string, backend backend) {
        srv := httpserver.Server{Addr: ":"}
-       srv.Handler = router.New(backend)
+       srv.Handler = router.New(backend, nil)
        c.Check(srv.Start(), check.IsNil)
        s.cluster.RemoteClusters[id] = arvados.RemoteCluster{
                Scheme: "http",
index 2b0cb22b04fbed0fedcb282c4269dbb008bff1a5..6a9ad8c15f3db2132bf5c122d8ae639764dbfff7 100644 (file)
@@ -64,6 +64,7 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
        cluster.TLS.Insecure = true
        cluster.API.MaxItemsPerResponse = 1000
        cluster.API.MaxRequestAmplification = 4
+       cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
        arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
        arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
        s.testHandler = &Handler{Cluster: cluster}
index 01f2161632bf8e6562f51b4266e43602b90218c6..cc06246420559479203e24843164cee281e07633 100644 (file)
@@ -16,9 +16,11 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/controller/federation"
+       "git.arvados.org/arvados.git/lib/controller/localdb"
        "git.arvados.org/arvados.git/lib/controller/railsproxy"
        "git.arvados.org/arvados.git/lib/controller/router"
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        _ "github.com/lib/pq"
@@ -63,7 +65,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 
 func (h *Handler) CheckHealth() error {
        h.setupOnce.Do(h.setup)
-       _, _, err := railsproxy.FindRailsAPI(h.Cluster)
+       _, err := h.db(context.TODO())
+       if err != nil {
+               return err
+       }
+       _, _, err = railsproxy.FindRailsAPI(h.Cluster)
        return err
 }
 
@@ -78,10 +84,10 @@ func (h *Handler) setup() {
        mux.Handle("/_health/", &health.Handler{
                Token:  h.Cluster.ManagementToken,
                Prefix: "/_health/",
-               Routes: health.Routes{"ping": func() error { _, err := h.db(&http.Request{}); return err }},
+               Routes: health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }},
        })
 
-       rtr := router.New(federation.New(h.Cluster))
+       rtr := router.New(federation.New(h.Cluster), localdb.WrapCallsInTransactions(h.db))
        mux.Handle("/arvados/v1/config", rtr)
        mux.Handle("/"+arvados.EndpointUserAuthenticate.Path, rtr)
 
@@ -115,7 +121,7 @@ func (h *Handler) setup() {
 
 var errDBConnection = errors.New("database connection error")
 
-func (h *Handler) db(req *http.Request) (*sql.DB, error) {
+func (h *Handler) db(ctx context.Context) (*sql.DB, error) {
        h.pgdbMtx.Lock()
        defer h.pgdbMtx.Unlock()
        if h.pgdb != nil {
@@ -124,14 +130,14 @@ func (h *Handler) db(req *http.Request) (*sql.DB, error) {
 
        db, err := sql.Open("postgres", h.Cluster.PostgreSQL.Connection.String())
        if err != nil {
-               httpserver.Logger(req).WithError(err).Error("postgresql connect failed")
+               ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
                return nil, errDBConnection
        }
        if p := h.Cluster.PostgreSQL.ConnectionPool; p > 0 {
                db.SetMaxOpenConns(p)
        }
        if err := db.Ping(); err != nil {
-               httpserver.Logger(req).WithError(err).Error("postgresql connect succeeded but ping failed")
+               ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect scuceeded but ping failed")
                return nil, errDBConnection
        }
        h.pgdb = db
index c7bce97130bfb0e4b327d3d2233a41d9c9c3b73d..ef6b9195f10be05b1dd69bcbedda800df66dfdb3 100644 (file)
@@ -52,6 +52,7 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
                PostgreSQL:       integrationTestCluster().PostgreSQL,
                ForceLegacyAPI14: forceLegacyAPI14,
        }
+       s.cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
        s.cluster.TLS.Insecure = true
        arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
        arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/")
diff --git a/lib/controller/localdb/db.go b/lib/controller/localdb/db.go
new file mode 100644 (file)
index 0000000..4f64e63
--- /dev/null
@@ -0,0 +1,116 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "sync"
+
+       "git.arvados.org/arvados.git/lib/controller/router"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+)
+
+// WrapCallsInTransactions returns a call wrapper (suitable for
+// assigning to router.router.WrapCalls) that starts a new transaction
+// for each API call, and commits only if the call succeeds.
+//
+// The wrapper calls getdb() to get a database handle before each API
+// call.
+func WrapCallsInTransactions(getdb func(context.Context) (*sql.DB, error)) func(router.RoutableFunc) router.RoutableFunc {
+       return func(origFunc router.RoutableFunc) router.RoutableFunc {
+               return func(ctx context.Context, opts interface{}) (_ interface{}, err error) {
+                       ctx, finishtx := starttx(ctx, getdb)
+                       defer finishtx(&err)
+                       return origFunc(ctx, opts)
+               }
+       }
+}
+
+// ContextWithTransaction returns a child context in which the given
+// transaction will be used by any localdb API call that needs one.
+// The caller is responsible for calling Commit or Rollback on tx.
+func ContextWithTransaction(ctx context.Context, tx *sql.Tx) context.Context {
+       txn := &transaction{tx: tx}
+       txn.setup.Do(func() {})
+       return context.WithValue(ctx, contextKeyTransaction, txn)
+}
+
+type contextKeyT string
+
+var contextKeyTransaction = contextKeyT("transaction")
+
+type transaction struct {
+       tx    *sql.Tx
+       err   error
+       getdb func(context.Context) (*sql.DB, error)
+       setup sync.Once
+}
+
+type transactionFinishFunc func(*error)
+
+// starttx returns a new child context that can be used with
+// currenttx(). It does not open a database transaction until the
+// first call to currenttx().
+//
+// The caller must eventually call the returned finishtx() func to
+// commit or rollback the transaction, if any.
+//
+//     func example(ctx context.Context) (err error) {
+//             ctx, finishtx := starttx(ctx, dber)
+//             defer finishtx(&err)
+//             // ...
+//             tx, err := currenttx(ctx)
+//             if err != nil {
+//                     return fmt.Errorf("example: %s", err)
+//             }
+//             return tx.ExecContext(...)
+//     }
+//
+// If *err is nil, finishtx() commits the transaction and assigns any
+// resulting error to *err.
+//
+// If *err is non-nil, finishtx() rolls back the transaction, and
+// does not modify *err.
+func starttx(ctx context.Context, getdb func(context.Context) (*sql.DB, error)) (context.Context, transactionFinishFunc) {
+       txn := &transaction{getdb: getdb}
+       return context.WithValue(ctx, contextKeyTransaction, txn), func(err *error) {
+               txn.setup.Do(func() {
+                       // Using (*sync.Once)Do() prevents a future
+                       // call to currenttx() from opening a
+                       // transaction which would never get committed
+                       // or rolled back. If currenttx() hasn't been
+                       // called before now, future calls will return
+                       // this error.
+                       txn.err = errors.New("refusing to start a transaction after wrapped function already returned")
+               })
+               if txn.tx == nil {
+                       // we never [successfully] started a transaction
+                       return
+               }
+               if *err != nil {
+                       ctxlog.FromContext(ctx).Debug("rollback")
+                       txn.tx.Rollback()
+                       return
+               }
+               *err = txn.tx.Commit()
+       }
+}
+
+func currenttx(ctx context.Context) (*sql.Tx, error) {
+       txn, ok := ctx.Value(contextKeyTransaction).(*transaction)
+       if !ok {
+               return nil, errors.New("bug: there is no transaction in this context")
+       }
+       txn.setup.Do(func() {
+               if db, err := txn.getdb(ctx); err != nil {
+                       txn.err = err
+               } else {
+                       txn.tx, txn.err = db.Begin()
+               }
+       })
+       return txn.tx, txn.err
+}
diff --git a/lib/controller/localdb/db_test.go b/lib/controller/localdb/db_test.go
new file mode 100644 (file)
index 0000000..5bab86c
--- /dev/null
@@ -0,0 +1,98 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+       "context"
+       "database/sql"
+       "sync"
+       "sync/atomic"
+
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       _ "github.com/lib/pq"
+       check "gopkg.in/check.v1"
+)
+
+// testdb returns a DB connection for the given cluster config.
+func testdb(c *check.C, cluster *arvados.Cluster) *sql.DB {
+       db, err := sql.Open("postgres", cluster.PostgreSQL.Connection.String())
+       c.Assert(err, check.IsNil)
+       return db
+}
+
+// testctx returns a context suitable for running a test case in a new
+// transaction, and a rollback func which the caller should call after
+// the test.
+func testctx(c *check.C, db *sql.DB) (ctx context.Context, rollback func()) {
+       tx, err := db.Begin()
+       c.Assert(err, check.IsNil)
+       return ContextWithTransaction(context.Background(), tx), func() {
+               c.Check(tx.Rollback(), check.IsNil)
+       }
+}
+
+var _ = check.Suite(&DatabaseSuite{})
+
+type DatabaseSuite struct{}
+
+func (*DatabaseSuite) TestTransactionContext(c *check.C) {
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.IsNil)
+       cluster, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+
+       var getterCalled int64
+       getter := func(context.Context) (*sql.DB, error) {
+               atomic.AddInt64(&getterCalled, 1)
+               return testdb(c, cluster), nil
+       }
+       wrapper := WrapCallsInTransactions(getter)
+       wrappedFunc := wrapper(func(ctx context.Context, opts interface{}) (interface{}, error) {
+               txes := make([]*sql.Tx, 20)
+               var wg sync.WaitGroup
+               for i := range txes {
+                       i := i
+                       wg.Add(1)
+                       go func() {
+                               // Concurrent calls to currenttx(),
+                               // with different children of the same
+                               // parent context, will all return the
+                               // same transaction.
+                               defer wg.Done()
+                               ctx, cancel := context.WithCancel(ctx)
+                               defer cancel()
+                               tx, err := currenttx(ctx)
+                               c.Check(err, check.IsNil)
+                               txes[i] = tx
+                       }()
+               }
+               wg.Wait()
+               for i := range txes[1:] {
+                       c.Check(txes[i], check.Equals, txes[i+1])
+               }
+               return true, nil
+       })
+
+       ok, err := wrappedFunc(context.Background(), "blah")
+       c.Check(ok, check.Equals, true)
+       c.Check(err, check.IsNil)
+       c.Check(getterCalled, check.Equals, int64(1))
+
+       // When a wrapped func returns without calling currenttx(),
+       // calling currenttx() later shouldn't start a new
+       // transaction.
+       var savedctx context.Context
+       ok, err = wrapper(func(ctx context.Context, opts interface{}) (interface{}, error) {
+               savedctx = ctx
+               return true, nil
+       })(context.Background(), "blah")
+       c.Check(ok, check.Equals, true)
+       c.Check(err, check.IsNil)
+       tx, err := currenttx(savedctx)
+       c.Check(tx, check.IsNil)
+       c.Check(err, check.NotNil)
+}
diff --git a/lib/controller/localdb/docker_test.go b/lib/controller/localdb/docker_test.go
new file mode 100644 (file)
index 0000000..90c98b7
--- /dev/null
@@ -0,0 +1,68 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+       "io"
+       "net"
+       "strings"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       check "gopkg.in/check.v1"
+)
+
+type pgproxy struct {
+       net.Listener
+}
+
+// newPgProxy sets up a TCP proxy, listening on all interfaces, that
+// forwards all connections to the cluster's PostgreSQL server. This
+// allows the caller to run a docker container that can connect to a
+// postgresql instance that listens on the test host's loopback
+// interface.
+//
+// Caller is responsible for calling Close() on the returned pgproxy.
+func newPgProxy(c *check.C, cluster *arvados.Cluster) *pgproxy {
+       host := cluster.PostgreSQL.Connection["host"]
+       if host == "" {
+               host = "localhost"
+       }
+       port := cluster.PostgreSQL.Connection["port"]
+       if port == "" {
+               port = "5432"
+       }
+       target := net.JoinHostPort(host, port)
+
+       ln, err := net.Listen("tcp", ":")
+       c.Assert(err, check.IsNil)
+       go func() {
+               for {
+                       downstream, err := ln.Accept()
+                       if err != nil && strings.Contains(err.Error(), "use of closed network connection") {
+                               return
+                       }
+                       c.Assert(err, check.IsNil)
+                       go func() {
+                               c.Logf("pgproxy accepted connection from %s", downstream.RemoteAddr().String())
+                               defer downstream.Close()
+                               upstream, err := net.Dial("tcp", target)
+                               if err != nil {
+                                       c.Logf("net.Dial(%q): %s", target, err)
+                                       return
+                               }
+                               defer upstream.Close()
+                               go io.Copy(downstream, upstream)
+                               io.Copy(upstream, downstream)
+                       }()
+               }
+       }()
+       c.Logf("pgproxy listening at %s", ln.Addr().String())
+       return &pgproxy{Listener: ln}
+}
+
+func (proxy *pgproxy) Port() string {
+       _, port, _ := net.SplitHostPort(proxy.Addr().String())
+       return port
+}
index 905cfed15c500d95689857e36c1c3323165c4d3d..1cd349a10eaa94d987899ac1315f811ffbf186e1 100644 (file)
@@ -6,9 +6,13 @@ package localdb
 
 import (
        "context"
+       "database/sql"
+       "encoding/json"
        "errors"
+       "fmt"
        "net/http"
        "net/url"
+       "strings"
 
        "git.arvados.org/arvados.git/lib/controller/rpc"
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -96,9 +100,9 @@ func noopLogout(cluster *arvados.Cluster, opts arvados.LogoutOptions) (arvados.L
        return arvados.LogoutResponse{RedirectLocation: target}, nil
 }
 
-func createAPIClientAuthorization(ctx context.Context, conn *rpc.Conn, rootToken string, authinfo rpc.UserSessionAuthInfo) (arvados.APIClientAuthorization, error) {
+func createAPIClientAuthorization(ctx context.Context, conn *rpc.Conn, rootToken string, authinfo rpc.UserSessionAuthInfo) (resp arvados.APIClientAuthorization, err error) {
        ctxRoot := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{rootToken}})
-       resp, err := conn.UserSessionCreate(ctxRoot, rpc.UserSessionCreateOptions{
+       newsession, err := conn.UserSessionCreate(ctxRoot, rpc.UserSessionCreateOptions{
                // Send a fake ReturnTo value instead of the caller's
                // opts.ReturnTo. We won't follow the resulting
                // redirect target anyway.
@@ -106,12 +110,36 @@ func createAPIClientAuthorization(ctx context.Context, conn *rpc.Conn, rootToken
                AuthInfo: authinfo,
        })
        if err != nil {
-               return arvados.APIClientAuthorization{}, err
+               return
        }
-       target, err := url.Parse(resp.RedirectLocation)
+       target, err := url.Parse(newsession.RedirectLocation)
        if err != nil {
-               return arvados.APIClientAuthorization{}, err
+               return
        }
        token := target.Query().Get("api_token")
-       return conn.APIClientAuthorizationCurrent(auth.NewContext(ctx, auth.NewCredentials(token)), arvados.GetOptions{})
+       tx, err := currenttx(ctx)
+       if err != nil {
+               return
+       }
+       tokensecret := token
+       if strings.Contains(token, "/") {
+               tokenparts := strings.Split(token, "/")
+               if len(tokenparts) >= 3 {
+                       tokensecret = tokenparts[2]
+               }
+       }
+       var exp sql.NullString
+       var scopes []byte
+       err = tx.QueryRowContext(ctx, "select uuid, api_token, expires_at, scopes from api_client_authorizations where api_token=$1", tokensecret).Scan(&resp.UUID, &resp.APIToken, &exp, &scopes)
+       if err != nil {
+               return
+       }
+       resp.ExpiresAt = exp.String
+       if len(scopes) > 0 {
+               err = json.Unmarshal(scopes, &resp.Scopes)
+               if err != nil {
+                       return resp, fmt.Errorf("unmarshal scopes: %s", err)
+               }
+       }
+       return
 }
index 79b5f16158ab0c39cba73822cb534bd9303ce24c..3cbf14fe0b0c11694bd95831179f3f0b7f6eb586 100644 (file)
@@ -24,10 +24,13 @@ func (s *LDAPSuite) TestLoginLDAPViaPAM(c *check.C) {
        if !haveDocker() {
                c.Skip("skipping docker test because docker is not available")
        }
+       pgproxy := newPgProxy(c, s.cluster)
+       defer pgproxy.Close()
+
        cmd := exec.Command("bash", "login_ldap_docker_test.sh")
        cmd.Stdout = os.Stderr
        cmd.Stderr = os.Stderr
-       cmd.Env = append(os.Environ(), "config_method=pam")
+       cmd.Env = append(os.Environ(), "config_method=pam", "pgport="+pgproxy.Port())
        err := cmd.Run()
        c.Check(err, check.IsNil)
 }
@@ -39,10 +42,13 @@ func (s *LDAPSuite) TestLoginLDAPBuiltin(c *check.C) {
        if !haveDocker() {
                c.Skip("skipping docker test because docker is not available")
        }
+       pgproxy := newPgProxy(c, s.cluster)
+       defer pgproxy.Close()
+
        cmd := exec.Command("bash", "login_ldap_docker_test.sh")
        cmd.Stdout = os.Stderr
        cmd.Stderr = os.Stderr
-       cmd.Env = append(os.Environ(), "config_method=ldap")
+       cmd.Env = append(os.Environ(), "config_method=ldap", "pgport="+pgproxy.Port())
        err := cmd.Run()
        c.Check(err, check.IsNil)
 }
index 4e0679f620bf6b4ec67d8fc118b66db5ef3332ac..0225f204611d051ce5c17a6f5eb594f5845aaa18 100755 (executable)
@@ -1,5 +1,9 @@
 #!/bin/bash
 
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 # This script demonstrates using LDAP for Arvados user authentication.
 #
 # It configures arvados controller in a docker container, optionally
@@ -74,6 +78,7 @@ Clusters:
       Connection:
         client_encoding: utf8
         host: ${hostname}
+        port: "${pgport}"
         dbname: arvados_test
         user: arvados
         password: insecure_arvados_test
index 9a8f83f857092951f76d4f97d94f5fdef66626a1..64ae58bce2681f792020b1855c2465d5ad226ae1 100644 (file)
@@ -6,6 +6,7 @@ package localdb
 
 import (
        "context"
+       "database/sql"
        "encoding/json"
        "net"
        "net/http"
@@ -26,6 +27,11 @@ type LDAPSuite struct {
        cluster *arvados.Cluster
        ctrl    *ldapLoginController
        ldap    *godap.LDAPServer // fake ldap server that accepts auth goodusername/goodpassword
+       db      *sql.DB
+
+       // transaction context
+       ctx      context.Context
+       rollback func()
 }
 
 func (s *LDAPSuite) TearDownSuite(c *check.C) {
@@ -85,10 +91,21 @@ func (s *LDAPSuite) SetUpSuite(c *check.C) {
                Cluster:    s.cluster,
                RailsProxy: railsproxy.NewConn(s.cluster),
        }
+       s.db = testdb(c, s.cluster)
+}
+
+func (s *LDAPSuite) SetUpTest(c *check.C) {
+       s.ctx, s.rollback = testctx(c, s.db)
+}
+
+func (s *LDAPSuite) TearDownTest(c *check.C) {
+       s.rollback()
 }
 
 func (s *LDAPSuite) TestLoginSuccess(c *check.C) {
-       resp, err := s.ctrl.UserAuthenticate(context.Background(), arvados.UserAuthenticateOptions{
+       conn := NewConn(s.cluster)
+       conn.loginController = s.ctrl
+       resp, err := conn.UserAuthenticate(s.ctx, arvados.UserAuthenticateOptions{
                Username: "goodusername",
                Password: "goodpassword",
        })
@@ -97,7 +114,7 @@ func (s *LDAPSuite) TestLoginSuccess(c *check.C) {
        c.Check(resp.UUID, check.Matches, `zzzzz-gj3su-.*`)
        c.Check(resp.Scopes, check.DeepEquals, []string{"all"})
 
-       ctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{"v2/" + resp.UUID + "/" + resp.APIToken}})
+       ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{"v2/" + resp.UUID + "/" + resp.APIToken}})
        user, err := railsproxy.NewConn(s.cluster).UserGetCurrent(ctx, arvados.GetOptions{})
        c.Check(err, check.IsNil)
        c.Check(user.Email, check.Equals, "goodusername@example.com")
@@ -107,7 +124,7 @@ func (s *LDAPSuite) TestLoginSuccess(c *check.C) {
 func (s *LDAPSuite) TestLoginFailure(c *check.C) {
        // search returns no results
        s.cluster.Login.LDAP.SearchBase = "dc=example,dc=invalid"
-       resp, err := s.ctrl.UserAuthenticate(context.Background(), arvados.UserAuthenticateOptions{
+       resp, err := s.ctrl.UserAuthenticate(s.ctx, arvados.UserAuthenticateOptions{
                Username: "goodusername",
                Password: "goodpassword",
        })
@@ -120,7 +137,7 @@ func (s *LDAPSuite) TestLoginFailure(c *check.C) {
 
        // search returns result, but auth fails
        s.cluster.Login.LDAP.SearchBase = "dc=example,dc=com"
-       resp, err = s.ctrl.UserAuthenticate(context.Background(), arvados.UserAuthenticateOptions{
+       resp, err = s.ctrl.UserAuthenticate(s.ctx, arvados.UserAuthenticateOptions{
                Username: "badusername",
                Password: "badpassword",
        })
index 1345e86900dd1056da5a9259bf0d5caf179253e5..2ccb1fce2a1e9dc42592f0543b2b0fc03f7d6fea 100644 (file)
@@ -39,7 +39,6 @@ var _ = check.Suite(&OIDCLoginSuite{})
 
 type OIDCLoginSuite struct {
        cluster               *arvados.Cluster
-       ctx                   context.Context
        localdb               *Conn
        railsSpy              *arvadostest.Proxy
        fakeIssuer            *httptest.Server
index 939868a17b94f132644e3459292290294514e84f..d7381860ea422299406e0a38e726f6d09bb38481 100644 (file)
@@ -77,9 +77,7 @@ func (p *proxy) Do(
                Header: hdrOut,
                Body:   reqIn.Body,
        }).WithContext(reqIn.Context())
-
-       resp, err := client.Do(reqOut)
-       return resp, err
+       return client.Do(reqOut)
 }
 
 // Copy a response (or error) to the downstream client
index c347e2f795517f74c9f67ec0311ba41d3250dafb..29c81ac5cae9ac63431e691852230a00c2335afe 100644 (file)
@@ -19,144 +19,154 @@ import (
 )
 
 type router struct {
-       mux *mux.Router
-       fed arvados.API
+       mux       *mux.Router
+       backend   arvados.API
+       wrapCalls func(RoutableFunc) RoutableFunc
 }
 
-func New(fed arvados.API) *router {
+// New returns a new router (which implements the http.Handler
+// interface) that serves requests by calling Arvados API methods on
+// the given backend.
+//
+// If wrapCalls is not nil, it is called once for each API method, and
+// the returned method is used in its place. This can be used to
+// install hooks before and after each API call and alter responses;
+// see localdb.WrapCallsInTransaction for an example.
+func New(backend arvados.API, wrapCalls func(RoutableFunc) RoutableFunc) *router {
        rtr := &router{
-               mux: mux.NewRouter(),
-               fed: fed,
+               mux:       mux.NewRouter(),
+               backend:   backend,
+               wrapCalls: wrapCalls,
        }
        rtr.addRoutes()
        return rtr
 }
 
-type routableFunc func(ctx context.Context, opts interface{}) (interface{}, error)
+type RoutableFunc func(ctx context.Context, opts interface{}) (interface{}, error)
 
 func (rtr *router) addRoutes() {
        for _, route := range []struct {
                endpoint    arvados.APIEndpoint
                defaultOpts func() interface{}
-               exec        routableFunc
+               exec        RoutableFunc
        }{
                {
                        arvados.EndpointConfigGet,
                        func() interface{} { return &struct{}{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.ConfigGet(ctx)
+                               return rtr.backend.ConfigGet(ctx)
                        },
                },
                {
                        arvados.EndpointLogin,
                        func() interface{} { return &arvados.LoginOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.Login(ctx, *opts.(*arvados.LoginOptions))
+                               return rtr.backend.Login(ctx, *opts.(*arvados.LoginOptions))
                        },
                },
                {
                        arvados.EndpointLogout,
                        func() interface{} { return &arvados.LogoutOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.Logout(ctx, *opts.(*arvados.LogoutOptions))
+                               return rtr.backend.Logout(ctx, *opts.(*arvados.LogoutOptions))
                        },
                },
                {
                        arvados.EndpointCollectionCreate,
                        func() interface{} { return &arvados.CreateOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.CollectionCreate(ctx, *opts.(*arvados.CreateOptions))
+                               return rtr.backend.CollectionCreate(ctx, *opts.(*arvados.CreateOptions))
                        },
                },
                {
                        arvados.EndpointCollectionUpdate,
                        func() interface{} { return &arvados.UpdateOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.CollectionUpdate(ctx, *opts.(*arvados.UpdateOptions))
+                               return rtr.backend.CollectionUpdate(ctx, *opts.(*arvados.UpdateOptions))
                        },
                },
                {
                        arvados.EndpointCollectionGet,
                        func() interface{} { return &arvados.GetOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.CollectionGet(ctx, *opts.(*arvados.GetOptions))
+                               return rtr.backend.CollectionGet(ctx, *opts.(*arvados.GetOptions))
                        },
                },
                {
                        arvados.EndpointCollectionList,
                        func() interface{} { return &arvados.ListOptions{Limit: -1} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.CollectionList(ctx, *opts.(*arvados.ListOptions))
+                               return rtr.backend.CollectionList(ctx, *opts.(*arvados.ListOptions))
                        },
                },
                {
                        arvados.EndpointCollectionProvenance,
                        func() interface{} { return &arvados.GetOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.CollectionProvenance(ctx, *opts.(*arvados.GetOptions))
+                               return rtr.backend.CollectionProvenance(ctx, *opts.(*arvados.GetOptions))
                        },
                },
                {
                        arvados.EndpointCollectionUsedBy,
                        func() interface{} { return &arvados.GetOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.CollectionUsedBy(ctx, *opts.(*arvados.GetOptions))
+                               return rtr.backend.CollectionUsedBy(ctx, *opts.(*arvados.GetOptions))
                        },
                },
                {
                        arvados.EndpointCollectionDelete,
                        func() interface{} { return &arvados.DeleteOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.CollectionDelete(ctx, *opts.(*arvados.DeleteOptions))
+                               return rtr.backend.CollectionDelete(ctx, *opts.(*arvados.DeleteOptions))
                        },
                },
                {
                        arvados.EndpointCollectionTrash,
                        func() interface{} { return &arvados.DeleteOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.CollectionTrash(ctx, *opts.(*arvados.DeleteOptions))
+                               return rtr.backend.CollectionTrash(ctx, *opts.(*arvados.DeleteOptions))
                        },
                },
                {
                        arvados.EndpointCollectionUntrash,
                        func() interface{} { return &arvados.UntrashOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.CollectionUntrash(ctx, *opts.(*arvados.UntrashOptions))
+                               return rtr.backend.CollectionUntrash(ctx, *opts.(*arvados.UntrashOptions))
                        },
                },
                {
                        arvados.EndpointContainerCreate,
                        func() interface{} { return &arvados.CreateOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.ContainerCreate(ctx, *opts.(*arvados.CreateOptions))
+                               return rtr.backend.ContainerCreate(ctx, *opts.(*arvados.CreateOptions))
                        },
                },
                {
                        arvados.EndpointContainerUpdate,
                        func() interface{} { return &arvados.UpdateOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.ContainerUpdate(ctx, *opts.(*arvados.UpdateOptions))
+                               return rtr.backend.ContainerUpdate(ctx, *opts.(*arvados.UpdateOptions))
                        },
                },
                {
                        arvados.EndpointContainerGet,
                        func() interface{} { return &arvados.GetOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.ContainerGet(ctx, *opts.(*arvados.GetOptions))
+                               return rtr.backend.ContainerGet(ctx, *opts.(*arvados.GetOptions))
                        },
                },
                {
                        arvados.EndpointContainerList,
                        func() interface{} { return &arvados.ListOptions{Limit: -1} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.ContainerList(ctx, *opts.(*arvados.ListOptions))
+                               return rtr.backend.ContainerList(ctx, *opts.(*arvados.ListOptions))
                        },
                },
                {
                        arvados.EndpointContainerDelete,
                        func() interface{} { return &arvados.DeleteOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.ContainerDelete(ctx, *opts.(*arvados.DeleteOptions))
+                               return rtr.backend.ContainerDelete(ctx, *opts.(*arvados.DeleteOptions))
                        },
                },
                {
@@ -165,7 +175,7 @@ func (rtr *router) addRoutes() {
                                return &arvados.GetOptions{Select: []string{"uuid", "state", "priority", "auth_uuid", "locked_by_uuid"}}
                        },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.ContainerLock(ctx, *opts.(*arvados.GetOptions))
+                               return rtr.backend.ContainerLock(ctx, *opts.(*arvados.GetOptions))
                        },
                },
                {
@@ -174,144 +184,148 @@ func (rtr *router) addRoutes() {
                                return &arvados.GetOptions{Select: []string{"uuid", "state", "priority", "auth_uuid", "locked_by_uuid"}}
                        },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.ContainerUnlock(ctx, *opts.(*arvados.GetOptions))
+                               return rtr.backend.ContainerUnlock(ctx, *opts.(*arvados.GetOptions))
                        },
                },
                {
                        arvados.EndpointSpecimenCreate,
                        func() interface{} { return &arvados.CreateOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.SpecimenCreate(ctx, *opts.(*arvados.CreateOptions))
+                               return rtr.backend.SpecimenCreate(ctx, *opts.(*arvados.CreateOptions))
                        },
                },
                {
                        arvados.EndpointSpecimenUpdate,
                        func() interface{} { return &arvados.UpdateOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.SpecimenUpdate(ctx, *opts.(*arvados.UpdateOptions))
+                               return rtr.backend.SpecimenUpdate(ctx, *opts.(*arvados.UpdateOptions))
                        },
                },
                {
                        arvados.EndpointSpecimenGet,
                        func() interface{} { return &arvados.GetOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.SpecimenGet(ctx, *opts.(*arvados.GetOptions))
+                               return rtr.backend.SpecimenGet(ctx, *opts.(*arvados.GetOptions))
                        },
                },
                {
                        arvados.EndpointSpecimenList,
                        func() interface{} { return &arvados.ListOptions{Limit: -1} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.SpecimenList(ctx, *opts.(*arvados.ListOptions))
+                               return rtr.backend.SpecimenList(ctx, *opts.(*arvados.ListOptions))
                        },
                },
                {
                        arvados.EndpointSpecimenDelete,
                        func() interface{} { return &arvados.DeleteOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.SpecimenDelete(ctx, *opts.(*arvados.DeleteOptions))
+                               return rtr.backend.SpecimenDelete(ctx, *opts.(*arvados.DeleteOptions))
                        },
                },
                {
                        arvados.EndpointUserCreate,
                        func() interface{} { return &arvados.CreateOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserCreate(ctx, *opts.(*arvados.CreateOptions))
+                               return rtr.backend.UserCreate(ctx, *opts.(*arvados.CreateOptions))
                        },
                },
                {
                        arvados.EndpointUserMerge,
                        func() interface{} { return &arvados.UserMergeOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserMerge(ctx, *opts.(*arvados.UserMergeOptions))
+                               return rtr.backend.UserMerge(ctx, *opts.(*arvados.UserMergeOptions))
                        },
                },
                {
                        arvados.EndpointUserActivate,
                        func() interface{} { return &arvados.UserActivateOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserActivate(ctx, *opts.(*arvados.UserActivateOptions))
+                               return rtr.backend.UserActivate(ctx, *opts.(*arvados.UserActivateOptions))
                        },
                },
                {
                        arvados.EndpointUserSetup,
                        func() interface{} { return &arvados.UserSetupOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserSetup(ctx, *opts.(*arvados.UserSetupOptions))
+                               return rtr.backend.UserSetup(ctx, *opts.(*arvados.UserSetupOptions))
                        },
                },
                {
                        arvados.EndpointUserUnsetup,
                        func() interface{} { return &arvados.GetOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserUnsetup(ctx, *opts.(*arvados.GetOptions))
+                               return rtr.backend.UserUnsetup(ctx, *opts.(*arvados.GetOptions))
                        },
                },
                {
                        arvados.EndpointUserGetCurrent,
                        func() interface{} { return &arvados.GetOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserGetCurrent(ctx, *opts.(*arvados.GetOptions))
+                               return rtr.backend.UserGetCurrent(ctx, *opts.(*arvados.GetOptions))
                        },
                },
                {
                        arvados.EndpointUserGetSystem,
                        func() interface{} { return &arvados.GetOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserGetSystem(ctx, *opts.(*arvados.GetOptions))
+                               return rtr.backend.UserGetSystem(ctx, *opts.(*arvados.GetOptions))
                        },
                },
                {
                        arvados.EndpointUserGet,
                        func() interface{} { return &arvados.GetOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserGet(ctx, *opts.(*arvados.GetOptions))
+                               return rtr.backend.UserGet(ctx, *opts.(*arvados.GetOptions))
                        },
                },
                {
                        arvados.EndpointUserUpdateUUID,
                        func() interface{} { return &arvados.UpdateUUIDOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserUpdateUUID(ctx, *opts.(*arvados.UpdateUUIDOptions))
+                               return rtr.backend.UserUpdateUUID(ctx, *opts.(*arvados.UpdateUUIDOptions))
                        },
                },
                {
                        arvados.EndpointUserUpdate,
                        func() interface{} { return &arvados.UpdateOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserUpdate(ctx, *opts.(*arvados.UpdateOptions))
+                               return rtr.backend.UserUpdate(ctx, *opts.(*arvados.UpdateOptions))
                        },
                },
                {
                        arvados.EndpointUserList,
                        func() interface{} { return &arvados.ListOptions{Limit: -1} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserList(ctx, *opts.(*arvados.ListOptions))
+                               return rtr.backend.UserList(ctx, *opts.(*arvados.ListOptions))
                        },
                },
                {
                        arvados.EndpointUserBatchUpdate,
                        func() interface{} { return &arvados.UserBatchUpdateOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserBatchUpdate(ctx, *opts.(*arvados.UserBatchUpdateOptions))
+                               return rtr.backend.UserBatchUpdate(ctx, *opts.(*arvados.UserBatchUpdateOptions))
                        },
                },
                {
                        arvados.EndpointUserDelete,
                        func() interface{} { return &arvados.DeleteOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserDelete(ctx, *opts.(*arvados.DeleteOptions))
+                               return rtr.backend.UserDelete(ctx, *opts.(*arvados.DeleteOptions))
                        },
                },
                {
                        arvados.EndpointUserAuthenticate,
                        func() interface{} { return &arvados.UserAuthenticateOptions{} },
                        func(ctx context.Context, opts interface{}) (interface{}, error) {
-                               return rtr.fed.UserAuthenticate(ctx, *opts.(*arvados.UserAuthenticateOptions))
+                               return rtr.backend.UserAuthenticate(ctx, *opts.(*arvados.UserAuthenticateOptions))
                        },
                },
        } {
-               rtr.addRoute(route.endpoint, route.defaultOpts, route.exec)
+               exec := route.exec
+               if rtr.wrapCalls != nil {
+                       exec = rtr.wrapCalls(exec)
+               }
+               rtr.addRoute(route.endpoint, route.defaultOpts, exec)
        }
        rtr.mux.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
                httpserver.Errors(w, []string{"API endpoint not found"}, http.StatusNotFound)
@@ -326,7 +340,7 @@ var altMethod = map[string]string{
        "GET":   "HEAD", // Accept HEAD at any GET route
 }
 
-func (rtr *router) addRoute(endpoint arvados.APIEndpoint, defaultOpts func() interface{}, exec routableFunc) {
+func (rtr *router) addRoute(endpoint arvados.APIEndpoint, defaultOpts func() interface{}, exec RoutableFunc) {
        methods := []string{endpoint.Method}
        if alt, ok := altMethod[endpoint.Method]; ok {
                methods = append(methods, alt)
index 4cabe70f162a6f36360da58f7c820e1712e0728f..c73bc64915f12aff293f23c803e25771669fe8a9 100644 (file)
@@ -38,8 +38,8 @@ type RouterSuite struct {
 func (s *RouterSuite) SetUpTest(c *check.C) {
        s.stub = arvadostest.APIStub{}
        s.rtr = &router{
-               mux: mux.NewRouter(),
-               fed: &s.stub,
+               mux:     mux.NewRouter(),
+               backend: &s.stub,
        }
        s.rtr.addRoutes()
 }
@@ -169,7 +169,7 @@ func (s *RouterIntegrationSuite) SetUpTest(c *check.C) {
        cluster.TLS.Insecure = true
        arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
        url, _ := url.Parse("https://" + os.Getenv("ARVADOS_TEST_API_HOST"))
-       s.rtr = New(rpc.NewConn("zzzzz", url, true, rpc.PassthroughTokenProvider))
+       s.rtr = New(rpc.NewConn("zzzzz", url, true, rpc.PassthroughTokenProvider), nil)
 }
 
 func (s *RouterIntegrationSuite) TearDownSuite(c *check.C) {
index cea4607c98fe533fec8b37f839f1f641ce3fcccb..da466c31ca7d2a3a4a13f23fc92177d04ef087ec 100644 (file)
@@ -204,8 +204,8 @@ var errNotFound = errors.New("not found")
 
 // Finds the timestamp of the newest copy of blk on svc. Returns
 // errNotFound if blk is not on svc at all.
-func (rcvr recoverer) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
-       found, err := svc.Index(rcvr.client, blk)
+func (rcvr recoverer) newestMtime(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
+       found, err := svc.Index(ctx, rcvr.client, blk)
        if err != nil {
                logger.WithError(err).Warn("error getting index")
                return time.Time{}, err
@@ -236,7 +236,7 @@ var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -
 // saved. But if the block's timestamp is more recent than blobsigttl,
 // keepstore will refuse to trash it even if told to by keep-balance.
 func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
-       if latest, err := rcvr.newestMtime(logger, blk, svc); err != nil {
+       if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err != nil {
                return err
        } else if latest.Add(blobsigttl).After(blobsigexp) {
                return nil
@@ -245,7 +245,7 @@ func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger,
                return fmt.Errorf("error updating timestamp: %s", err)
        }
        logger.Debug("updated timestamp")
-       if latest, err := rcvr.newestMtime(logger, blk, svc); err == errNotFound {
+       if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err == errNotFound {
                return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
        } else if err != nil {
                return err
index 7bb66a158e50646f1fc984df7ea30ccd31528bf3..71e499ebcab0cca29ccbee7a350cfbbb5aaa6e19 100644 (file)
@@ -169,21 +169,47 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
         #
         # Found a file, check for secondaryFiles
         #
-        primary["secondaryFiles"] = []
+        specs = []
+        primary["secondaryFiles"] = secondaryspec
         for i, sf in enumerate(aslist(secondaryspec)):
             pattern = builder.do_eval(sf["pattern"], context=primary)
             if pattern is None:
                 continue
+            if isinstance(pattern, list):
+                specs.extend(pattern)
+            elif isinstance(pattern, dict):
+                specs.append(pattern)
+            elif isinstance(pattern, str):
+                specs.append({"pattern": pattern})
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
+
+        found = []
+        for i, sf in enumerate(specs):
+            if isinstance(sf, dict):
+                if sf.get("class") == "File":
+                    pattern = sf["basename"]
+                else:
+                    pattern = sf["pattern"]
+                    required = sf.get("required")
+            elif isinstance(sf, str):
+                pattern = sf
+                required = True
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
+
             sfpath = substitute(primary["location"], pattern)
-            required = builder.do_eval(sf.get("required"), context=primary)
+            required = builder.do_eval(required, context=primary)
 
             if fsaccess.exists(sfpath):
-                primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
+                found.append({"location": sfpath, "class": "File"})
             elif required:
                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
                     "Required secondary file '%s' does not exist" % sfpath)
 
-        primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
+        primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
             discovered[primary["location"]] = primary["secondaryFiles"]
     elif inputschema["type"] not in primitive_types_set:
@@ -434,9 +460,13 @@ def packed_workflow(arvrunner, tool, merged_map):
     def visit(v, cur_id):
         if isinstance(v, dict):
             if v.get("class") in ("CommandLineTool", "Workflow"):
-                if "id" not in v:
-                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
-                cur_id = rewrite_to_orig.get(v["id"], v["id"])
+                if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
+                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
+                if "id" in v:
+                    cur_id = rewrite_to_orig.get(v["id"], v["id"])
+            if "path" in v and "location" not in v:
+                v["location"] = v["path"]
+                del v["path"]
             if "location" in v and not v["location"].startswith("keep:"):
                 v["location"] = merged_map[cur_id].resolved[v["location"]]
             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
index df9fac8426cc450f0dc7014c72d0e799863657cb..c4c0968756a46b04ad8b201cbc66241fb4d6826d 100644 (file)
   output:
     out: null
   tool: wf-defaults/wf4.cwl
-  doc: default in embedded subworkflow missing 'id' field
+  doc: default in embedded subworkflow missing 'id' field, v1.0
   should_fail: true
 
+- job: null
+  output:
+    out: null
+  tool: wf-defaults/wf8.cwl
+  doc: default in embedded subworkflow missing 'id' field, v1.1
+  should_fail: false
+
 - job: null
   output:
     out: null
index 8bfc5d63f744a784e13d78ec48049211ae629c48..bd927824886d1805cf8daf260e4911e6b4fe2d85 100644 (file)
@@ -21,4 +21,4 @@ steps:
             class: Directory
             location: inp1
       outputs: []
-      arguments: [echo, $(inputs.inp2)]
\ No newline at end of file
+      arguments: [echo, $(inputs.inp2)]
diff --git a/sdk/cwl/tests/wf-defaults/default-dir8.cwl b/sdk/cwl/tests/wf-defaults/default-dir8.cwl
new file mode 100644 (file)
index 0000000..a5b9c2f
--- /dev/null
@@ -0,0 +1,24 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.1
+class: Workflow
+inputs: []
+outputs: []
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+steps:
+  step1:
+    in: []
+    out: []
+    run:
+      class: CommandLineTool
+      inputs:
+        inp2:
+          type: Directory
+          default:
+            class: Directory
+            location: inp1
+      outputs: []
+      arguments: [echo, $(inputs.inp2)]
index 6e562e43dbd791f390dd25f6803e4a23c49ce967..3f498fdffbfa56100c721f6efb78efcb40267f74 100644 (file)
@@ -14,4 +14,4 @@ steps:
   step1:
     in: []
     out: []
-    run: default-dir4.cwl
\ No newline at end of file
+    run: default-dir4.cwl
diff --git a/sdk/cwl/tests/wf-defaults/wf8.cwl b/sdk/cwl/tests/wf-defaults/wf8.cwl
new file mode 100644 (file)
index 0000000..2548fae
--- /dev/null
@@ -0,0 +1,17 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.1
+class: Workflow
+inputs: []
+outputs: []
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  SubworkflowFeatureRequirement: {}
+steps:
+  step1:
+    in: []
+    out: []
+    run: default-dir8.cwl
index 1e2c07e867e84d6d6719fdd4f9298b005a86c6e9..562c8c1e7d7c66528a2ce0874eca034c9eb7b328 100644 (file)
@@ -57,9 +57,16 @@ type Client struct {
        // HTTP headers to add/override in outgoing requests.
        SendHeader http.Header
 
+       // Timeout for requests. NewClientFromConfig and
+       // NewClientFromEnv return a Client with a default 5 minute
+       // timeout.  To disable this timeout and rely on each
+       // http.Request's context deadline instead, set Timeout to
+       // zero.
+       Timeout time.Duration
+
        dd *DiscoveryDocument
 
-       ctx context.Context
+       defaultRequestID string
 }
 
 // The default http.Client used by a Client with Insecure==true and
@@ -67,12 +74,10 @@ type Client struct {
 var InsecureHTTPClient = &http.Client{
        Transport: &http.Transport{
                TLSClientConfig: &tls.Config{
-                       InsecureSkipVerify: true}},
-       Timeout: 5 * time.Minute}
+                       InsecureSkipVerify: true}}}
 
 // The default http.Client used by a Client otherwise.
-var DefaultSecureClient = &http.Client{
-       Timeout: 5 * time.Minute}
+var DefaultSecureClient = &http.Client{}
 
 // NewClientFromConfig creates a new Client that uses the endpoints in
 // the given cluster.
@@ -87,6 +92,7 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
                Scheme:   ctrlURL.Scheme,
                APIHost:  ctrlURL.Host,
                Insecure: cluster.TLS.Insecure,
+               Timeout:  5 * time.Minute,
        }, nil
 }
 
@@ -116,6 +122,7 @@ func NewClientFromEnv() *Client {
                AuthToken:       os.Getenv("ARVADOS_API_TOKEN"),
                Insecure:        insecure,
                KeepServiceURIs: svcs,
+               Timeout:         5 * time.Minute,
        }
 }
 
@@ -131,11 +138,12 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        }
 
        if req.Header.Get("X-Request-Id") == "" {
-               reqid, _ := req.Context().Value(contextKeyRequestID{}).(string)
-               if reqid == "" {
-                       reqid, _ = c.context().Value(contextKeyRequestID{}).(string)
-               }
-               if reqid == "" {
+               var reqid string
+               if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+                       reqid = ctxreqid
+               } else if c.defaultRequestID != "" {
+                       reqid = c.defaultRequestID
+               } else {
                        reqid = reqIDGen.Next()
                }
                if req.Header == nil {
@@ -144,7 +152,36 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
                        req.Header.Set("X-Request-Id", reqid)
                }
        }
-       return c.httpClient().Do(req)
+       var cancel context.CancelFunc
+       if c.Timeout > 0 {
+               ctx := req.Context()
+               ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
+               req = req.WithContext(ctx)
+       }
+       resp, err := c.httpClient().Do(req)
+       if err == nil && cancel != nil {
+               // We need to call cancel() eventually, but we can't
+               // use "defer cancel()" because the context has to
+               // stay alive until the caller has finished reading
+               // the response body.
+               resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
+       } else if cancel != nil {
+               cancel()
+       }
+       return resp, err
+}
+
+// cancelOnClose calls a provided CancelFunc when its wrapped
+// ReadCloser's Close() method is called.
+type cancelOnClose struct {
+       io.ReadCloser
+       cancel context.CancelFunc
+}
+
+func (coc cancelOnClose) Close() error {
+       err := coc.ReadCloser.Close()
+       coc.cancel()
+       return err
 }
 
 func isRedirectStatus(code int) bool {
@@ -266,7 +303,7 @@ func anythingToValues(params interface{}) (url.Values, error) {
 //
 // path must not contain a query string.
 func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
-       return c.RequestAndDecodeContext(c.context(), dst, method, path, body, params)
+       return c.RequestAndDecodeContext(context.Background(), dst, method, path, body, params)
 }
 
 func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, method, path string, body io.Reader, params interface{}) error {
@@ -332,17 +369,10 @@ func (c *Client) UpdateBody(rsc resource) io.Reader {
 // header.
 func (c *Client) WithRequestID(reqid string) *Client {
        cc := *c
-       cc.ctx = ContextWithRequestID(cc.context(), reqid)
+       cc.defaultRequestID = reqid
        return &cc
 }
 
-func (c *Client) context() context.Context {
-       if c.ctx == nil {
-               return context.Background()
-       }
-       return c.ctx
-}
-
 func (c *Client) httpClient() *http.Client {
        switch {
        case c.Client != nil:
index 029e223218b2a5136b8eac2238b088e2ce4fb983..a54712f330ea2b1ff2a6b8107daec5639c082c32 100644 (file)
@@ -126,6 +126,7 @@ type Cluster struct {
                BalancePeriod            Duration
                BalanceCollectionBatch   int
                BalanceCollectionBuffers int
+               BalanceTimeout           Duration
 
                WebDAVCache WebDAVCacheConfig
        }
index 3af7479202efb46df838e48246ccf9c2f0b5b100..da1710374e1e69ca4bfe6c2f77c8b990a1f7dc4e 100644 (file)
@@ -141,20 +141,20 @@ func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error
 }
 
 // Index returns an unsorted list of blocks at the given mount point.
-func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
-       return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+       return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
 }
 
 // Index returns an unsorted list of blocks that can be retrieved from
 // this server.
-func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
-       return s.index(c, s.url("index/"+prefix))
+func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+       return s.index(ctx, c, s.url("index/"+prefix))
 }
 
-func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
-       req, err := http.NewRequest("GET", url, nil)
+func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) {
+       req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
        if err != nil {
-               return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+               return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
        }
        resp, err := c.Do(req)
        if err != nil {
index 8715f74f0bd6ec24abf72f1622c55f0f3d9cd76f..3a82f4b7ee2f0fac31f86ecd338037319c429f3a 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "context"
        "net/http"
 
        check "gopkg.in/check.v1"
@@ -22,6 +23,6 @@ func (*KeepServiceSuite) TestIndexTimeout(c *check.C) {
                APIHost:   "zzzzz.arvadosapi.com",
                AuthToken: "xyzzy",
        }
-       _, err := (&KeepService{}).IndexMount(client, "fake", "")
+       _, err := (&KeepService{}).IndexMount(context.Background(), client, "fake", "")
        c.Check(err, check.ErrorMatches, `.*timeout.*`)
 }
index c8a1a27b79c1e939438dcb8ed603c206a299c409..127a09ee2db71a00bc7c05ee5e2e651ea379a33d 100644 (file)
@@ -180,7 +180,7 @@ GEM
     pg (1.1.4)
     power_assert (1.1.4)
     public_suffix (4.0.3)
-    rack (2.2.2)
+    rack (2.2.3)
     rack-test (0.6.3)
       rack (>= 1.0)
     rails (5.0.7.2)
index 5dd127b3e230246962b179f2431d9ad49a95483a..5bb013c9add7a1f241d4779768cef462ac9956b2 100644 (file)
@@ -2,6 +2,8 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
+require 'update_permissions'
+
 include CurrentApiClient
 
 def fix_roles_projects
@@ -20,10 +22,11 @@ def fix_roles_projects
 
           if old_owner != system_user_uuid
             # 2) Ownership of a role becomes a can_manage link
-            Link.create!(link_class: 'permission',
+            Link.new(link_class: 'permission',
                          name: 'can_manage',
                          tail_uuid: old_owner,
-                         head_uuid: g.uuid)
+                         head_uuid: g.uuid).
+              save!(validate: false)
           end
         end
 
@@ -37,20 +40,22 @@ def fix_roles_projects
           # 3) If a role owns anything, give it to system user and it
           # becomes a can_manage link
           klass.joins("join groups on groups.uuid=#{klass.table_name}.owner_uuid and groups.group_class='role'").each do |owned|
-            Link.create!(link_class: 'permission',
-                         name: 'can_manage',
-                         tail_uuid: owned.owner_uuid,
-                         head_uuid: owned.uuid)
+            Link.new(link_class: 'permission',
+                     name: 'can_manage',
+                     tail_uuid: owned.owner_uuid,
+                     head_uuid: owned.uuid).
+              save!(validate: false)
             owned.owner_uuid = system_user_uuid
             owned.save_with_unique_name!
           end
         end
 
         Group.joins("join groups as g2 on g2.uuid=groups.owner_uuid and g2.group_class='role'").each do |owned|
-          Link.create!(link_class: 'permission',
+          Link.new(link_class: 'permission',
                        name: 'can_manage',
                        tail_uuid: owned.owner_uuid,
-                       head_uuid: owned.uuid)
+                       head_uuid: owned.uuid).
+            save!(validate: false)
           owned.owner_uuid = system_user_uuid
           owned.save_with_unique_name!
         end
index b30d1edc259833b0350bb949fa0c00249c8f051b..ee5dcd3421a9a0bbfd9e2a72be03fc9304fec21f 100644 (file)
@@ -198,6 +198,20 @@ foo_file_readable_by_active_redundant_permission_via_private_group:
   head_uuid: zzzzz-4zz18-znfnqtbbv4spc3w
   properties: {}
 
+foo_file_readable_by_project_viewer:
+  uuid: zzzzz-o0j2j-fp1d8395ldqw22p
+  owner_uuid: zzzzz-tpzed-000000000000000
+  created_at: 2014-01-24 20:42:26 -0800
+  modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+  modified_by_user_uuid: zzzzz-tpzed-000000000000000
+  modified_at: 2014-01-24 20:42:26 -0800
+  updated_at: 2014-01-24 20:42:26 -0800
+  tail_uuid: zzzzz-tpzed-projectviewer1a
+  link_class: permission
+  name: can_read
+  head_uuid: zzzzz-4zz18-znfnqtbbv4spc3w
+  properties: {}
+
 bar_file_readable_by_active:
   uuid: zzzzz-o0j2j-8hppiuduf8eqdng
   owner_uuid: zzzzz-tpzed-000000000000000
index 3d1fda927f0554ce7955a4e73d7e0e8921f7d8a5..30fddfa5b8be8f89c2f43651c1c316a6e59253fe 100644 (file)
@@ -6,6 +6,7 @@ require 'test_helper'
 require 'fix_roles_projects'
 
 class GroupTest < ActiveSupport::TestCase
+  include DbCurrentTime
 
   test "cannot set owner_uuid to object with existing ownership cycle" do
     set_user_from_auth :active_trustedclient
@@ -317,6 +318,14 @@ insert into groups (uuid, owner_uuid, name, group_class, created_at, updated_at)
     g6 = insert_group Group.generate_uuid, system_user_uuid, 'name collision', 'role'
     g7 = insert_group Group.generate_uuid, users(:active).uuid, 'name collision', 'role'
 
+    g8 = insert_group Group.generate_uuid, users(:active).uuid, 'trashed with no class', nil
+    g8obj = Group.find_by_uuid(g8)
+    g8obj.trash_at = db_current_time
+    g8obj.delete_at = db_current_time
+    act_as_system_user do
+      g8obj.save!(validate: false)
+    end
+
     refresh_permissions
 
     act_as_system_user do
@@ -328,6 +337,7 @@ update links set tail_uuid='#{g5}' where uuid='#{l1.uuid}'
     end
 
     assert_equal nil, Group.find_by_uuid(g1).group_class
+    assert_equal nil, Group.find_by_uuid(g8).group_class
     assert_equal users(:active).uuid, Group.find_by_uuid(g2).owner_uuid
     assert_equal g3, Group.find_by_uuid(g4).owner_uuid
     assert !Link.where(tail_uuid: users(:active).uuid, head_uuid: g2, link_class: "permission", name: "can_manage").any?
@@ -337,6 +347,7 @@ update links set tail_uuid='#{g5}' where uuid='#{l1.uuid}'
     fix_roles_projects
 
     assert_equal 'role', Group.find_by_uuid(g1).group_class
+    assert_equal 'role', Group.find_by_uuid(g8).group_class
     assert_equal system_user_uuid, Group.find_by_uuid(g2).owner_uuid
     assert_equal system_user_uuid, Group.find_by_uuid(g4).owner_uuid
     assert Link.where(tail_uuid: users(:active).uuid, head_uuid: g2, link_class: "permission", name: "can_manage").any?
index fd94ef7afa3340edea84a486e4abd03810fe1b8d..f789abe69270c024e73a5294666bc06169b45026 100644 (file)
@@ -9,6 +9,6 @@ case "$TARGET" in
         fpm_depends+=(fuse-libs)
         ;;
     debian* | ubuntu*)
-        fpm_depends+=(libcurl3-gnutls libpython2.7)
+        fpm_depends+=(libcurl3-gnutls)
         ;;
 esac
index 3c35d304cb395cf97485cd462f0f78ae31b523d6..86423a2976b1e0909470bd563c486aee894743af 100644 (file)
@@ -6,6 +6,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "fmt"
        "io"
@@ -71,6 +72,9 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
 
        defer bal.time("sweep", "wall clock time to run one full sweep")()
 
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+       defer cancel()
+
        var lbFile *os.File
        if bal.LostBlocksFile != "" {
                tmpfn := bal.LostBlocksFile + ".tmp"
@@ -111,13 +115,21 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
        if err = bal.CheckSanityEarly(client); err != nil {
                return
        }
+
+       // On a big site, indexing and sending trash/pull lists can
+       // take much longer than the usual 5 minute client
+       // timeout. From here on, we rely on the context deadline
+       // instead, aborting the entire operation if any part takes
+       // too long.
+       client.Timeout = 0
+
        rs := bal.rendezvousState()
        if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
                if runOptions.SafeRendezvousState != "" {
                        bal.logf("notice: KeepServices list has changed since last run")
                }
                bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
-               if err = bal.ClearTrashLists(client); err != nil {
+               if err = bal.ClearTrashLists(ctx, client); err != nil {
                        return
                }
                // The current rendezvous state becomes "safe" (i.e.,
@@ -126,7 +138,8 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
                // succeed in clearing existing trash lists.
                nextRunOptions.SafeRendezvousState = rs
        }
-       if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
+
+       if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
                return
        }
        bal.ComputeChangeSets()
@@ -146,14 +159,14 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
                lbFile = nil
        }
        if runOptions.CommitPulls {
-               err = bal.CommitPulls(client)
+               err = bal.CommitPulls(ctx, client)
                if err != nil {
                        // Skip trash if we can't pull. (Too cautious?)
                        return
                }
        }
        if runOptions.CommitTrash {
-               err = bal.CommitTrash(client)
+               err = bal.CommitTrash(ctx, client)
        }
        return
 }
@@ -286,11 +299,11 @@ func (bal *Balancer) rendezvousState() string {
 // We avoid this problem if we clear all trash lists before getting
 // indexes. (We also assume there is only one rebalancing process
 // running at a time.)
-func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
+func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
        for _, srv := range bal.KeepServices {
                srv.ChangeSet = &ChangeSet{}
        }
-       return bal.CommitTrash(c)
+       return bal.CommitTrash(ctx, c)
 }
 
 // GetCurrentState determines the current replication state, and the
@@ -304,7 +317,10 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
 // collection manifests in the database (API server).
 //
 // It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
+func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+
        defer bal.time("get_state", "wall clock time to get current state")()
        bal.BlockStateMap = NewBlockStateMap()
 
@@ -348,12 +364,13 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
                go func(mounts []*KeepMount) {
                        defer wg.Done()
                        bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
-                       idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+                       idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
                        if err != nil {
                                select {
                                case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
                                default:
                                }
+                               cancel()
                                return
                        }
                        if len(errs) > 0 {
@@ -391,6 +408,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
                                }
                                for range collQ {
                                }
+                               cancel()
                                return
                        }
                        bal.collScanned++
@@ -402,7 +420,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
        wg.Add(1)
        go func() {
                defer wg.Done()
-               err = EachCollection(c, pageSize,
+               err = EachCollection(ctx, c, pageSize,
                        func(coll arvados.Collection) error {
                                collQ <- coll
                                if len(errs) > 0 {
@@ -422,6 +440,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
                        case errs <- err:
                        default:
                        }
+                       cancel()
                }
        }()
 
@@ -1084,22 +1103,22 @@ func (bal *Balancer) CheckSanityLate() error {
 // keepstore servers. This has the effect of increasing replication of
 // existing blocks that are either underreplicated or poorly
 // distributed according to rendezvous hashing.
-func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
        defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
        return bal.commitAsync(c, "send pull list",
                func(srv *KeepService) error {
-                       return srv.CommitPulls(c)
+                       return srv.CommitPulls(ctx, c)
                })
 }
 
 // CommitTrash sends the computed lists of trash requests to the
 // keepstore servers. This has the effect of deleting blocks that are
 // overreplicated or unreferenced.
-func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
        defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
        return bal.commitAsync(c, "send trash list",
                func(srv *KeepService) error {
-                       return srv.CommitTrash(c)
+                       return srv.CommitTrash(ctx, c)
                })
 }
 
index c4ddc90c419ae7e0d9c4d1b5cbadd9011f2a31fd..1659918cafe20c62162abfb1e841a40f80170c0a 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "context"
        "fmt"
        "time"
 
@@ -30,7 +31,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
 //
 // If pageSize > 0 it is used as the maximum page size in each API
 // call; otherwise the maximum allowed page size is requested.
-func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
        if progress == nil {
                progress = func(_, _ int) {}
        }
@@ -75,7 +76,7 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
        for {
                progress(callCount, expectCount)
                var page arvados.CollectionList
-               err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+               err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
                if err != nil {
                        return err
                }
index f8921c294afa075f290c2db6fd352b315d25e8ac..3ab9d07b2e2ed6bcc7220ae17aad4e6e7a665855 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "context"
        "sync"
        "time"
 
@@ -29,7 +30,7 @@ func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
                        longestStreak := 0
                        var lastMod time.Time
                        sawUUID := make(map[string]bool)
-                       err := EachCollection(s.client, pageSize, func(c arvados.Collection) error {
+                       err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
                                if c.ModifiedAt.IsZero() {
                                        return nil
                                }
index e2adf1a4b79942b9457beb2ccd31df31abbb96b9..17f8418f622f992a7025db9b9214e60c5a39f2ca 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "context"
        "encoding/json"
        "fmt"
        "io"
@@ -35,19 +36,19 @@ func (srv *KeepService) URLBase() string {
 
 // CommitPulls sends the current list of pull requests to the storage
 // server (even if the list is empty).
-func (srv *KeepService) CommitPulls(c *arvados.Client) error {
-       return srv.put(c, "pull", srv.ChangeSet.Pulls)
+func (srv *KeepService) CommitPulls(ctx context.Context, c *arvados.Client) error {
+       return srv.put(ctx, c, "pull", srv.ChangeSet.Pulls)
 }
 
 // CommitTrash sends the current list of trash requests to the storage
 // server (even if the list is empty).
-func (srv *KeepService) CommitTrash(c *arvados.Client) error {
-       return srv.put(c, "trash", srv.ChangeSet.Trashes)
+func (srv *KeepService) CommitTrash(ctx context.Context, c *arvados.Client) error {
+       return srv.put(ctx, c, "trash", srv.ChangeSet.Trashes)
 }
 
 // Perform a PUT request at path, with data (as JSON) in the request
 // body.
-func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+func (srv *KeepService) put(ctx context.Context, c *arvados.Client, path string, data interface{}) error {
        // We'll start a goroutine to do the JSON encoding, so we can
        // stream it to the http client through a Pipe, rather than
        // keeping the entire encoded version in memory.
@@ -64,7 +65,7 @@ func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) er
        }()
 
        url := srv.URLBase() + "/" + path
-       req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+       req, err := http.NewRequestWithContext(ctx, "PUT", url, ioutil.NopCloser(jsonR))
        if err != nil {
                return fmt.Errorf("building request for %s: %v", url, err)
        }