Merge branch '15305-keep-balance-bytes'
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 12 Nov 2019 14:25:49 +0000 (09:25 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 12 Nov 2019 14:25:49 +0000 (09:25 -0500)
fixes #15305

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

72 files changed:
apps/workbench/app/models/arvados_resource_list.rb
apps/workbench/app/views/layouts/application.html.erb
apps/workbench/test/unit/arvados_resource_list_test.rb
build/run-tests.sh
doc/_config.yml
doc/admin/spot-instances.html.textile.liquid
doc/admin/upgrading.html.textile.liquid
doc/install/install-components.html.textile.liquid
doc/install/install-keep-web.html.textile.liquid
doc/install/install-workbench2-app.html.textile.liquid [new file with mode: 0644]
lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/controller/federation/conn.go
lib/controller/federation/list.go
lib/controller/federation/list_test.go
lib/controller/federation/login_test.go [new file with mode: 0644]
lib/controller/handler.go
lib/controller/handler_test.go
lib/controller/localdb/conn.go [new file with mode: 0644]
lib/controller/localdb/login.go [new file with mode: 0644]
lib/controller/localdb/login_test.go [new file with mode: 0644]
lib/controller/railsproxy/railsproxy.go
lib/controller/router/response.go
lib/controller/router/router.go
lib/controller/rpc/conn.go
lib/dispatchcloud/worker/runner.go
lib/dispatchcloud/worker/worker_test.go
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/tests/test_submit.py
sdk/go/arvados/api.go
sdk/go/arvados/client.go
sdk/go/arvados/config.go
sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_site.go
sdk/go/arvados/login.go [new file with mode: 0644]
sdk/go/arvadostest/api.go
sdk/go/arvadostest/proxy.go [new file with mode: 0644]
sdk/go/httpserver/logger.go
sdk/go/httpserver/logger_test.go
sdk/go/httpserver/responsewriter.go
sdk/java-v2/src/main/java/org/arvados/client/logic/collection/FileToken.java
sdk/java-v2/src/main/java/org/arvados/client/logic/keep/FileDownloader.java
sdk/java-v2/src/test/java/org/arvados/client/logic/collection/FileTokenTest.java
sdk/python/arvados/commands/arv_copy.py
services/api/app/controllers/user_sessions_controller.rb
services/api/app/models/user.rb
services/api/config/arvados_config.rb
services/api/test/functional/user_sessions_controller_test.rb
services/api/test/unit/user_test.rb
services/crunch-dispatch-slurm/crunch-dispatch-slurm.service
services/crunch-run/copier.go
services/crunch-run/crunchrun.go
services/keep-web/handler.go
services/keep-web/handler_test.go
services/keepstore/command.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/trash_worker.go
services/keepstore/trash_worker_test.go
services/keepstore/unix_volume.go
services/keepstore/unix_volume_test.go
services/ws/arvados-ws.service
tools/arvbox/lib/arvbox/docker/cluster-config.sh
tools/arvbox/lib/arvbox/docker/common.sh
tools/arvbox/lib/arvbox/docker/service/nginx/run
vendor/vendor.json

index cbd544ebb798f8413d499748b59d57555fd48d7f..99502bd56ed04951695e8bcb15704b64ea4b46e5 100644 (file)
@@ -147,11 +147,16 @@ class ArvadosResourceList
     if not @results.nil?
       @results.each(&block)
     else
+      results = []
       self.each_page do |items|
         items.each do |i|
+          results << i
           block.call i
         end
       end
+      # Cache results only if all were retrieved (block didn't raise
+      # an exception).
+      @results = results
     end
     self
   end
index bd3afbb681f098a1f3fe726e7d9bc49f671240ac..4fc7da9949cc7ebdaca5bf3cf24d54456ae8d5b6 100644 (file)
@@ -20,7 +20,7 @@ SPDX-License-Identifier: AGPL-3.0 %>
   <meta name="author" content="">
   <% if current_user %>
     <% content_for :js do %>
-      window.defaultSession = <%=raw({baseURL: Rails.configuration.Services.Controller.ExternalURL.to_s, token: Thread.current[:arvados_api_token], user: current_user}.to_json)%>
+      window.defaultSession = <%=raw({baseURL: Rails.configuration.Services.Controller.ExternalURL.to_s.gsub(/\/?$/,'/'), token: Thread.current[:arvados_api_token], user: current_user}.to_json)%>
     <% end %>
   <% end %>
   <% if current_user and $arvados_api_client.discovery[:websocketUrl] %>
index e9eb2f8ef6a8a1ab6792665c10570e0b317129cd..270b96203b249276be213c7a14b7346abf4487ca 100644 (file)
@@ -103,4 +103,20 @@ class ResourceListTest < ActiveSupport::TestCase
     assert_nil c.items_available
     refute_empty c.results
   end
+
+  test 'cache results across each(&block) calls' do
+    use_token :admin
+    c = Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').with_count('none')
+    c.each do |x|
+      x.description = 'foo'
+    end
+    found = 0
+    c.each do |x|
+      found += 1
+      # We should get the same objects we modified in the loop above
+      # -- not new objects built from another set of API responses.
+      assert_equal 'foo', x.description
+    end
+    assert_equal 201, found
+  end
 end
index 766ff1b82465322cc07b40b235f80087804deb38..0014547ce5448ed2abdc727b3eb8bccf0f75820a 100755 (executable)
@@ -283,6 +283,10 @@ sanity_checks() {
       # needed for pkgdown, builds R SDK doc pages
       which pandoc || fatal "No pandoc. Try: apt-get install pandoc"
     fi
+    echo 'procs with /dev/fuse open:'
+    find /proc/*/fd -lname /dev/fuse 2>/dev/null | cut -d/ -f3 | xargs --no-run-if-empty ps -lywww
+    echo 'grep fuse /proc/self/mountinfo:'
+    grep fuse /proc/self/mountinfo
 }
 
 rotate_logfile() {
index 344456d1f9b445e74d86c4bd9fbd2c624804a38b..da7635c1f4c96813f0aa0473252d1e91ca57da57 100644 (file)
@@ -203,6 +203,7 @@ navbar:
     - User interface:
       - install/install-sso.html.textile.liquid
       - install/install-workbench-app.html.textile.liquid
+      - install/install-workbench2-app.html.textile.liquid
       - install/install-composer.html.textile.liquid
     - Additional services:
       - install/install-ws.html.textile.liquid
index 1c61b6074cc0a7e8c50250ef97ee4af20a2e024f..4bba0e89354c58495bf9721279e501f7b6eccc12 100644 (file)
@@ -1,7 +1,7 @@
 ---
 layout: default
 navsection: admin
-title: Using AWS Spot instances
+title: Using Preemptible instances
 ...
 
 {% comment %}
@@ -10,60 +10,49 @@ Copyright (C) The Arvados Authors. All rights reserved.
 SPDX-License-Identifier: CC-BY-SA-3.0
 {% endcomment %}
 
-This page describes how to set up the system to take advantage of "Amazon's EC2 spot instances":https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-spot-instances.html.
+This page describes how to enable preemptible instances.  Preemptible instances typically offer lower cost computation with a tradeoff of lower service guarantees.  If a compute node is preempted, Arvados will restart the computation on a new instance.
 
-h3. Nodemanager
+Currently Arvados supports preemptible instances using AWS spot instances.
 
-Nodemanager should have configured cloud sizes that include the @preemptible@ boolean parameter. For example, for every on-demand cloud node size, you could create a @.spot@ variant, like this:
+h2. Configuration
 
-<pre>
-[Size m4.large]
-cores = 2
-scratch = 32000
-
-[Size m4.large.spot]
-cores = 2
-instance_type = m4.large
-preemptible = true
-scratch = 32000
-</pre>
-
-h3. Slurm dispatcher
-
-The @crunch-dispatch-slurm@ service needs a matching instance type configuration on @/etc/arvados/config.yml@, following the previous example:
+To use preemptible instances, set @UsePreemptibleInstances: true@ and add entries to @InstanceTypes@ with @Preemptible: true@ to @config.yml@.  Typically you want to add both preemptible and non-preemptible entries for each cloud provider VM type.  The @Price@ for preemptible instances is the maximum bid price, the actual price paid is dynamic and may be lower.  For example:
 
 <pre>
 Clusters:
   uuid_prefix:
+    Containers:
+      UsePreemptibleInstances: true
     InstanceTypes:
-    - Name: m4.large
-      VCPUs: 2
-      RAM: 7782000000
-      Scratch: 32000000000
-      Price: 0.1
-    - Name: m4.large.spot
-      Preemptible: true
-      VCPUs: 2
-      RAM: 7782000000
-      Scratch: 32000000000
-      Price: 0.1
+      m4.large:
+       Preemptible: false
+        ProviderType: m4.large
+        VCPUs: 2
+        RAM: 8GiB
+        AddedScratch: 32GB
+        Price: 0.1
+      m4.large.spot:
+       Preemptible: true
+        ProviderType: m4.large
+        VCPUs: 2
+        RAM: 8GiB
+        AddedScratch: 32GB
+        Price: 0.1
 </pre>
 
-@InstanceType@ names should match those defined on nodemanager's config file because it's @crunch-dispatch-slurm@'s job to select the instance type and communicate the decision to @nodemanager@ via Slurm.
+When @UsePreemptibleInstances@ is enabled, child containers (workflow steps) will automatically be made preemptible.  Note that because preempting the workflow runner would cancel the entire workflow, the workflow runner runs in a reserved (non-preemptible) instance.
 
-h3. API Server
+If you are using "crunch-dispatch-cloud":{{site.baseurl}}/install/install-dispatch-cloud.html no additional configuration is required.
 
-Container requests will need the @preemptible@ scheduling parameter included, to make the dispatcher request a spot instance. The API Server configuration file includes an option that when active, will auto assign the @preemptible@ parameter to any new child container request if it doesn't have it already. To activate this feature, the following should be added to the @application.yml@ file:
+If you are using the legacy Nodemanager, "see below":#nodemanager .
 
-<pre>
-preemptible_instances: true
-</pre>
+h2. Preemptible instances on AWS
 
-With this configuration active, child container requests should include the @preemptible = false@ parameter at creation time to avoid being scheduled for spot instance usage.
+For general information, see "using Amazon EC2 spot instances":https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-spot-instances.html .
 
-h3. AWS Permissions
+h3. Permissions
 
-When requesting spot instances, Amazon's API may return an authorization error depending on how users and permissions are set on the account. If this is the case check nodemanager's log for:
+When requesting spot instances, Amazon's API may return an authorization error depending on how users and permissions are set on the account. If this is the case check logs for this error:
 
 <pre>
 BaseHTTPError: AuthFailure.ServiceLinkedRoleCreationNotPermitted: The provided credentials do not have permission to create the service-linked role for EC2 Spot Instances.
@@ -75,4 +64,20 @@ h3. Cost Tracking
 
 Amazon's Spot instances prices are declared at instance request time and defined by the maximum price that the user is willing to pay per hour. By default, this price is the same amount as the on-demand version of each instance type, and this setting is the one that nodemanager uses for now, as it doesn't include any pricing data to the spot instance request.
 
-The real price that a spot instance has at any point in time is discovered at the end of each usage hour, depending on instance demand. For this reason, AWS provides a data feed subscription to get hourly logs, as described on "Amazon's User Guide":https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-data-feeds.html.
\ No newline at end of file
+The real price that a spot instance has at any point in time is discovered at the end of each usage hour, depending on instance demand. For this reason, AWS provides a data feed subscription to get hourly logs, as described on "Amazon's User Guide":https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-data-feeds.html.
+
+h2(#nodemanager). Nodemanager
+
+If you are using the legacy Nodemanager, its config file must also declare preemptible instance sizes, which must match the API server's @InstanceTypes@:
+
+<pre>
+[Size m4.large]
+cores = 2
+scratch = 32000
+
+[Size m4.large.spot]
+cores = 2
+instance_type = m4.large
+preemptible = true
+scratch = 32000
+</pre>
index 376d7abc07f528c4e87249edf21fe96c364bd796..185999526e5877f25d9769f92384a76096295111 100644 (file)
@@ -30,15 +30,6 @@ Note to developers: Add new items at the top. Include the date, issue number, co
 TODO: extract this information based on git commit messages and generate changelogs / release notes automatically.
 {% endcomment %}
 
-table(table table-bordered table-condensed).
-|_. development|"master":#master|\3.|
-|_. latest stable|"v1.4.0":#v1_4_0|\3.|
-|_\5. past stable|
-|"v1.3.3":#v1_3_3|"v1.3.0":#v1_3_0|\3.|
-|"v1.2.1":#v1_2_1|"v1.2.0":#v1_2_0|\3.|
-|"v1.1.4":#v1_1_4|"v1.1.3":#v1_1_3|"v1.1.2":#v1_1_2|"v1.1.1":#v1_1_1|"v1.1.0":#v1_1_0|
-|\5. "older":#older|
-
 h3(#master). development master (as of 2019-08-12)
 
 h4. Delete "keep_services" records
index b21c4bd2b2336477d2d18cd4cb18ccb187ec6560..15fbe11622e845fb79f206b8c50a412045d2a6ff 100644 (file)
@@ -17,7 +17,7 @@ table(table table-bordered table-condensed).
 |"Keep-balance":install-keep-balance.html |Storage cluster maintenance daemon responsible for moving blocks to their optimal server location, adjusting block replication levels, and trashing unreferenced blocks.|Required to free deleted data from underlying storage, and to ensure proper replication and block distribution (including support for storage classes).|
 |\3=. *User interface*|
 |"Single Sign On server":install-sso.html |Login server.|Required for web based login to Workbench.|
-|"Workbench":install-workbench-app.html |Primary graphical user interface for working with file collections and running containers.|Optional.  Depends on API server, SSO server, keep-web, websockets server.|
+|"Workbench":install-workbench-app.html, "Workbench2":install-workbench2-app.html |Primary graphical user interface for working with file collections and running containers.|Optional.  Depends on API server, SSO server, keep-web, websockets server.|
 |"Workflow Composer":install-composer.html |Graphical user interface for editing Common Workflow Language workflows.|Optional.  Depends on git server (arv-git-httpd).|
 |\3=. *Additional services*|
 |"Websockets server":install-ws.html |Event distribution server.|Required to view streaming container logs in Workbench.|
index 902ced0372a972449db08452b59c888a47f1d237..a8833f44da20b8492227b46c7470a0cf5a26bfa2 100644 (file)
@@ -100,7 +100,7 @@ server {
 If you restrict access to your Arvados services based on network topology -- for example, your proxy server is not reachable from the public internet -- additional proxy configuration might be needed to thwart cross-site scripting attacks that would circumvent your restrictions. Read the "'Intranet mode' section of the Keep-web documentation":https://godoc.org/github.com/curoverse/arvados/services/keep-web#hdr-Intranet_mode now.
 {% include 'notebox_end' %}
 
-h3. Configure DNS
+h3(#dns). Configure DNS
 
 Configure your DNS servers so the following names resolve to your Nginx proxy's public IP address.
 * @download.uuid_prefix.your.domain@
diff --git a/doc/install/install-workbench2-app.html.textile.liquid b/doc/install/install-workbench2-app.html.textile.liquid
new file mode 100644 (file)
index 0000000..6b94c8f
--- /dev/null
@@ -0,0 +1,93 @@
+---
+layout: default
+navsection: installguide
+title: Install Workbench2 (beta)
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Workbench2 is the web-based user interface for Arvados.
+
+{% include 'notebox_begin' %}
+Workbench2 is the replacement for Arvados Workbench. Workbench2 is currently in <i>beta</i>, it is not yet feature complete.
+{% include 'notebox_end' %}
+
+h2(#install_workbench). Install Workbench2 and dependencies
+
+Workbench2 does not require its own database. It is a set of html, javascript and css files that are served as static files from a web server like Nginx or Apache2.
+
+On a Debian-based system, install the following package:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install arvados-workbench2</span>
+</code></pre>
+</notextile>
+
+On a Red Hat-based system, install the following package:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install arvados-workbench2</span>
+</code></pre>
+</notextile>
+
+h2. Set up Web server
+
+For best performance, we recommend you use Nginx as your Web server to serve Workbench2. Workbench2 consists entirely of static files. To do that:
+
+<notextile>
+<ol>
+<li>Install Nginx</li>
+
+<li><p>Edit the http section of your Nginx configuration to serve Workbench2's files. You might add a block like the following, adding SSL and logging parameters to taste:</p>
+
+<pre><code>server {
+  listen       <span class="userinput">[your public IP address]</span>:443 ssl;
+  server_name  workbench2.<span class="userinput">uuid-prefix.your.domain</span>;
+
+  ssl on;
+  ssl_certificate     <span class="userinput">/YOUR/PATH/TO/cert.pem</span>;
+  ssl_certificate_key <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
+
+  index  index.html;
+
+  # Workbench2 uses a call to /config.json to bootstrap itself and talk to the desired API server
+  location /config.json {
+    return 200 '{ "API_HOST": "<span class="userinput">uuid-prefix.your.domain</span>" }';
+  }
+
+  location / {
+    root      /var/www/arvados-workbench2/workbench2;
+    index     index.html;
+    try_files $uri $uri/ /index.html;
+    if (-f $document_root/maintenance.html) {
+      return 503;
+    }
+  }
+}
+</code></pre>
+</li>
+
+<li>Restart Nginx.</li>
+
+</ol>
+</notextile>
+
+h2. Trusted client setting
+
+Log in to Workbench2 once to ensure that the Arvados API server has a record of the Workbench2 client.
+
+In the <strong>API server</strong> project root, start the Rails console.  {% include 'install_rails_command' %}
+
+At the console, enter the following commands to locate the ApiClient record for your Workbench2 installation (typically, while you're setting this up, the @last@ one in the database is the one you want), then set the @is_trusted@ flag for the appropriate client record:
+
+<notextile><pre><code>irb(main):001:0&gt; <span class="userinput">wb = ApiClient.all.last; [wb.url_prefix, wb.created_at]</span>
+=&gt; ["https://workbench2.<span class="userinput">uuid_prefix.your.domain</span>/", Sat, 20 Apr 2019 01:23:45 UTC +00:00]
+irb(main):002:0&gt; <span class="userinput">include CurrentApiClient</span>
+=&gt; true
+irb(main):003:0&gt; <span class="userinput">act_as_system_user do wb.update_attributes!(is_trusted: true) end</span>
+=&gt; true
+</code></pre>
+</notextile>
index 52856c8438c6d672eb47d4ee631a913a0d59cfcd..fee8503df812492357ebf813240fc8c8abda4596 100644 (file)
@@ -493,8 +493,21 @@ Clusters:
     Login:
       # These settings are provided by your OAuth2 provider (eg
       # Google) used to perform upstream authentication.
-      ProviderAppSecret: ""
       ProviderAppID: ""
+      ProviderAppSecret: ""
+
+      # (Experimental) Authenticate with Google, bypassing the
+      # SSO-provider gateway service. Use the Google Cloud console to
+      # generate the Client ID and secret (APIs and Services >
+      # Credentials > Create credentials > OAuth client ID > Web
+      # application) and add your controller's /login URL (e.g.,
+      # "https://zzzzz.example.com/login") as an authorized redirect
+      # URL.
+      #
+      # Requires EnableBetaController14287. ProviderAppID must be
+      # blank.
+      GoogleClientID: ""
+      GoogleClientSecret: ""
 
       # The cluster ID to delegate the user database.  When set,
       # logins on this cluster will be redirected to the login cluster
@@ -1060,7 +1073,7 @@ Clusters:
       # Workbench welcome screen, this is HTML text that will be
       # incorporated directly onto the page.
       WelcomePageHTML: |
-        <img src="arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
+        <img src="/arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
         <h2>Please log in.</h2>
 
         <p>The "Log in" button below will show you a sign-in
@@ -1075,5 +1088,12 @@ Clusters:
         identification, and does not retrieve any other personal
         information.</i>
 
+      InactivePageHTML: |
+        <img src="/arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
+        <h3>Hi! You're logged in, but...</h3>
+        <p>Your account is inactive.</p>
+        <p>An administrator must activate your account before you can get
+        any further.</p>
+
     # Use experimental controller code (see https://dev.arvados.org/issues/14287)
     EnableBetaController14287: false
index cd58868ed105a6143205b79e21a47147360307cd..0dd90ff65c7d0bf2ab86bacafed780d349bc8498 100644 (file)
@@ -130,8 +130,10 @@ var whitelist = map[string]bool{
        "InstanceTypes.*":                              true,
        "InstanceTypes.*.*":                            true,
        "Login":                                        true,
-       "Login.ProviderAppSecret":                      false,
+       "Login.GoogleClientID":                         false,
+       "Login.GoogleClientSecret":                     false,
        "Login.ProviderAppID":                          false,
+       "Login.ProviderAppSecret":                      false,
        "Login.LoginCluster":                           true,
        "Login.RemoteTokenRefresh":                     true,
        "Mail":                                         false,
@@ -208,6 +210,7 @@ var whitelist = map[string]bool{
        "Workbench.UserProfileFormMessage":             true,
        "Workbench.VocabularyURL":                      true,
        "Workbench.WelcomePageHTML":                    true,
+       "Workbench.InactivePageHTML":                   true,
 }
 
 func redactUnsafe(m map[string]interface{}, mPrefix, lookupPrefix string) error {
index 6e7aba81dfc84db1f9b8d1c807b2aa2cf1bb566a..42beb066344545f6faa2e9a1c30c6dc8b744578e 100644 (file)
@@ -499,8 +499,21 @@ Clusters:
     Login:
       # These settings are provided by your OAuth2 provider (eg
       # Google) used to perform upstream authentication.
-      ProviderAppSecret: ""
       ProviderAppID: ""
+      ProviderAppSecret: ""
+
+      # (Experimental) Authenticate with Google, bypassing the
+      # SSO-provider gateway service. Use the Google Cloud console to
+      # generate the Client ID and secret (APIs and Services >
+      # Credentials > Create credentials > OAuth client ID > Web
+      # application) and add your controller's /login URL (e.g.,
+      # "https://zzzzz.example.com/login") as an authorized redirect
+      # URL.
+      #
+      # Requires EnableBetaController14287. ProviderAppID must be
+      # blank.
+      GoogleClientID: ""
+      GoogleClientSecret: ""
 
       # The cluster ID to delegate the user database.  When set,
       # logins on this cluster will be redirected to the login cluster
@@ -1066,7 +1079,7 @@ Clusters:
       # Workbench welcome screen, this is HTML text that will be
       # incorporated directly onto the page.
       WelcomePageHTML: |
-        <img src="arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
+        <img src="/arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
         <h2>Please log in.</h2>
 
         <p>The "Log in" button below will show you a sign-in
@@ -1081,6 +1094,13 @@ Clusters:
         identification, and does not retrieve any other personal
         information.</i>
 
+      InactivePageHTML: |
+        <img src="/arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
+        <h3>Hi! You're logged in, but...</h3>
+        <p>Your account is inactive.</p>
+        <p>An administrator must activate your account before you can get
+        any further.</p>
+
     # Use experimental controller code (see https://dev.arvados.org/issues/14287)
     EnableBetaController14287: false
 `)
index 3bcafacd2c8bc47bdce87bb6908169c710662c6d..3829d0a40adab273f7ea126ac8ff40d92527fc46 100644 (file)
@@ -17,7 +17,7 @@ import (
        "strings"
 
        "git.curoverse.com/arvados.git/lib/config"
-       "git.curoverse.com/arvados.git/lib/controller/railsproxy"
+       "git.curoverse.com/arvados.git/lib/controller/localdb"
        "git.curoverse.com/arvados.git/lib/controller/rpc"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/auth"
@@ -31,7 +31,7 @@ type Conn struct {
 }
 
 func New(cluster *arvados.Cluster) *Conn {
-       local := railsproxy.NewConn(cluster)
+       local := localdb.NewConn(cluster)
        remotes := map[string]backend{}
        for id, remote := range cluster.RemoteClusters {
                if !remote.Proxy {
@@ -185,6 +185,30 @@ func (conn *Conn) ConfigGet(ctx context.Context) (json.RawMessage, error) {
        return json.RawMessage(buf.Bytes()), err
 }
 
+func (conn *Conn) Login(ctx context.Context, options arvados.LoginOptions) (arvados.LoginResponse, error) {
+       if id := conn.cluster.Login.LoginCluster; id != "" && id != conn.cluster.ClusterID {
+               // defer entire login procedure to designated cluster
+               remote, ok := conn.remotes[id]
+               if !ok {
+                       return arvados.LoginResponse{}, fmt.Errorf("configuration problem: designated login cluster %q is not defined", id)
+               }
+               baseURL := remote.BaseURL()
+               target, err := baseURL.Parse(arvados.EndpointLogin.Path)
+               if err != nil {
+                       return arvados.LoginResponse{}, fmt.Errorf("internal error getting redirect target: %s", err)
+               }
+               target.RawQuery = url.Values{
+                       "return_to": []string{options.ReturnTo},
+                       "remote":    []string{options.Remote},
+               }.Encode()
+               return arvados.LoginResponse{
+                       RedirectLocation: target.String(),
+               }, nil
+       } else {
+               return conn.local.Login(ctx, options)
+       }
+}
+
 func (conn *Conn) CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error) {
        if len(options.UUID) == 27 {
                // UUID is really a UUID
@@ -291,7 +315,10 @@ func (conn *Conn) APIClientAuthorizationCurrent(ctx context.Context, options arv
        return conn.chooseBackend(options.UUID).APIClientAuthorizationCurrent(ctx, options)
 }
 
-type backend interface{ arvados.API }
+type backend interface {
+       arvados.API
+       BaseURL() url.URL
+}
 
 type notFoundError struct{}
 
index 414870d24f5c42912785c4b29efa1fe4f91ec786..6ba184c471a1f4df37493520d7df9c46c7fcfaf7 100644 (file)
@@ -139,6 +139,13 @@ func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions
                }
        }
 
+       if matchAllFilters == nil {
+               // Not filtering by UUID at all; just query the local
+               // cluster.
+               _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
+               return err
+       }
+
        // Collate UUIDs in matchAllFilters by remote cluster ID --
        // e.g., todoByRemote["aaaaa"]["aaaaa-4zz18-000000000000000"]
        // will be true -- and count the total number of UUIDs we're
index b28609c2d5e961ea2c38188c9f13e91f941bc87f..c9b981fc15fed27d5fb75131894a2cff6cd77a41 100644 (file)
@@ -59,14 +59,14 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
        s.fed = New(s.cluster)
 }
 
-func (s *FederationSuite) addDirectRemote(c *check.C, id string, backend arvados.API) {
+func (s *FederationSuite) addDirectRemote(c *check.C, id string, backend backend) {
        s.cluster.RemoteClusters[id] = arvados.RemoteCluster{
                Host: "in-process.local",
        }
        s.fed.remotes[id] = backend
 }
 
-func (s *FederationSuite) addHTTPRemote(c *check.C, id string, backend arvados.API) {
+func (s *FederationSuite) addHTTPRemote(c *check.C, id string, backend backend) {
        srv := httpserver.Server{Addr: ":"}
        srv.Handler = router.New(backend)
        c.Check(srv.Start(), check.IsNil)
@@ -117,6 +117,9 @@ func (cl *collectionLister) CollectionList(ctx context.Context, options arvados.
                if cl.MaxPageSize > 0 && len(resp.Items) >= cl.MaxPageSize {
                        break
                }
+               if options.Limit >= 0 && len(resp.Items) >= options.Limit {
+                       break
+               }
                if cl.matchFilters(c, options.Filters) {
                        resp.Items = append(resp.Items, c)
                }
@@ -173,6 +176,15 @@ type listTrial struct {
        expectStatus int
 }
 
+func (s *CollectionListSuite) TestCollectionListNoUUIDFilters(c *check.C) {
+       s.test(c, listTrial{
+               count:       "none",
+               limit:       1,
+               expectUUIDs: []string{s.uuids[0][0]},
+               expectCalls: []int{1, 0, 0},
+       })
+}
+
 func (s *CollectionListSuite) TestCollectionListOneLocal(c *check.C) {
        s.test(c, listTrial{
                count:       "none",
@@ -433,6 +445,6 @@ func (s *CollectionListSuite) test(c *check.C, trial listTrial) {
                        continue
                }
                opts := calls[0].Options.(arvados.ListOptions)
-               c.Check(opts.Limit, check.Equals, -1)
+               c.Check(opts.Limit, check.Equals, trial.limit)
        }
 }
diff --git a/lib/controller/federation/login_test.go b/lib/controller/federation/login_test.go
new file mode 100644 (file)
index 0000000..e001014
--- /dev/null
@@ -0,0 +1,32 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package federation
+
+import (
+       "context"
+       "net/url"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       check "gopkg.in/check.v1"
+)
+
+func (s *FederationSuite) TestDeferToLoginCluster(c *check.C) {
+       s.addHTTPRemote(c, "zhome", &arvadostest.APIStub{})
+       s.cluster.Login.LoginCluster = "zhome"
+
+       returnTo := "https://app.example.com/foo?bar"
+       for _, remote := range []string{"", "ccccc"} {
+               resp, err := s.fed.Login(context.Background(), arvados.LoginOptions{Remote: remote, ReturnTo: returnTo})
+               c.Check(err, check.IsNil)
+               c.Logf("remote %q -- RedirectLocation %q", remote, resp.RedirectLocation)
+               target, err := url.Parse(resp.RedirectLocation)
+               c.Check(err, check.IsNil)
+               c.Check(target.Host, check.Equals, s.cluster.RemoteClusters["zhome"].Host)
+               c.Check(target.Scheme, check.Equals, "http")
+               c.Check(target.Query().Get("remote"), check.Equals, remote)
+               c.Check(target.Query().Get("return_to"), check.Equals, returnTo)
+       }
+}
index f7b2362f371e71b99a196d4b795e54a927e81919..f925233ba36ddfdddab0b26b3ea16db772e2877a 100644 (file)
@@ -83,6 +83,7 @@ func (h *Handler) setup() {
        if h.Cluster.EnableBetaController14287 {
                mux.Handle("/arvados/v1/collections", rtr)
                mux.Handle("/arvados/v1/collections/", rtr)
+               mux.Handle("/login", rtr)
        }
 
        hs := http.NotFoundHandler()
index 5dc0b1e86f8f1ff66d689f19ad6ab7d7b699a3de..ebadc5d0213a25f1fd21123a48b4456b042d4b52 100644 (file)
@@ -165,11 +165,18 @@ func (s *HandlerSuite) TestProxyNotFound(c *check.C) {
 }
 
 func (s *HandlerSuite) TestProxyRedirect(c *check.C) {
+       s.cluster.Login.ProviderAppID = "test"
+       s.cluster.Login.ProviderAppSecret = "test"
        req := httptest.NewRequest("GET", "https://0.0.0.0:1/login?return_to=foo", nil)
        resp := httptest.NewRecorder()
        s.handler.ServeHTTP(resp, req)
-       c.Check(resp.Code, check.Equals, http.StatusFound)
-       c.Check(resp.Header().Get("Location"), check.Matches, `https://0.0.0.0:1/auth/joshid\?return_to=%2Cfoo&?`)
+       if !c.Check(resp.Code, check.Equals, http.StatusFound) {
+               c.Log(resp.Body.String())
+       }
+       // Old "proxy entire request" code path returns an absolute
+       // URL. New lib/controller/federation code path returns a
+       // relative URL.
+       c.Check(resp.Header().Get("Location"), check.Matches, `(https://0.0.0.0:1)?/auth/joshid\?return_to=%2Cfoo&?`)
 }
 
 func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
diff --git a/lib/controller/localdb/conn.go b/lib/controller/localdb/conn.go
new file mode 100644 (file)
index 0000000..835ab43
--- /dev/null
@@ -0,0 +1,43 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+       "context"
+       "errors"
+
+       "git.curoverse.com/arvados.git/lib/controller/railsproxy"
+       "git.curoverse.com/arvados.git/lib/controller/rpc"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type railsProxy = rpc.Conn
+
+type Conn struct {
+       cluster     *arvados.Cluster
+       *railsProxy // handles API methods that aren't defined on Conn itself
+
+       googleLoginController
+}
+
+func NewConn(cluster *arvados.Cluster) *Conn {
+       return &Conn{
+               cluster:    cluster,
+               railsProxy: railsproxy.NewConn(cluster),
+       }
+}
+
+func (conn *Conn) Login(ctx context.Context, opts arvados.LoginOptions) (arvados.LoginResponse, error) {
+       wantGoogle := conn.cluster.Login.GoogleClientID != ""
+       wantSSO := conn.cluster.Login.ProviderAppID != ""
+       if wantGoogle == wantSSO {
+               return arvados.LoginResponse{}, errors.New("configuration problem: exactly one of Login.GoogleClientID and Login.ProviderAppID must be configured")
+       } else if wantGoogle {
+               return conn.googleLoginController.Login(ctx, conn.cluster, conn.railsProxy, opts)
+       } else {
+               // Proxy to RailsAPI, which hands off to sso-provider.
+               return conn.railsProxy.Login(ctx, opts)
+       }
+}
diff --git a/lib/controller/localdb/login.go b/lib/controller/localdb/login.go
new file mode 100644 (file)
index 0000000..8b83c38
--- /dev/null
@@ -0,0 +1,199 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+       "bytes"
+       "context"
+       "crypto/hmac"
+       "crypto/sha256"
+       "encoding/base64"
+       "errors"
+       "fmt"
+       "net/url"
+       "strings"
+       "sync"
+       "text/template"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/controller/rpc"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "github.com/coreos/go-oidc"
+       "golang.org/x/oauth2"
+)
+
+type googleLoginController struct {
+       issuer   string // override OIDC issuer URL (normally https://accounts.google.com) for testing
+       provider *oidc.Provider
+       mu       sync.Mutex
+}
+
+func (ctrl *googleLoginController) getProvider() (*oidc.Provider, error) {
+       ctrl.mu.Lock()
+       defer ctrl.mu.Unlock()
+       if ctrl.provider == nil {
+               issuer := ctrl.issuer
+               if issuer == "" {
+                       issuer = "https://accounts.google.com"
+               }
+               provider, err := oidc.NewProvider(context.Background(), issuer)
+               if err != nil {
+                       return nil, err
+               }
+               ctrl.provider = provider
+       }
+       return ctrl.provider, nil
+}
+
+func (ctrl *googleLoginController) Login(ctx context.Context, cluster *arvados.Cluster, railsproxy *railsProxy, opts arvados.LoginOptions) (arvados.LoginResponse, error) {
+       provider, err := ctrl.getProvider()
+       if err != nil {
+               return ctrl.loginError(fmt.Errorf("error setting up OpenID Connect provider: %s", err))
+       }
+       redirURL, err := (*url.URL)(&cluster.Services.Controller.ExternalURL).Parse("/login")
+       if err != nil {
+               return ctrl.loginError(fmt.Errorf("error making redirect URL: %s", err))
+       }
+       conf := &oauth2.Config{
+               ClientID:     cluster.Login.GoogleClientID,
+               ClientSecret: cluster.Login.GoogleClientSecret,
+               Endpoint:     provider.Endpoint(),
+               Scopes:       []string{oidc.ScopeOpenID, "profile", "email"},
+               RedirectURL:  redirURL.String(),
+       }
+       verifier := provider.Verifier(&oidc.Config{
+               ClientID: conf.ClientID,
+       })
+       if opts.State == "" {
+               // Initiate Google sign-in.
+               if opts.ReturnTo == "" {
+                       return ctrl.loginError(errors.New("missing return_to parameter"))
+               }
+               me := url.URL(cluster.Services.Controller.ExternalURL)
+               callback, err := me.Parse("/" + arvados.EndpointLogin.Path)
+               if err != nil {
+                       return ctrl.loginError(err)
+               }
+               conf.RedirectURL = callback.String()
+               state := ctrl.newOAuth2State([]byte(cluster.SystemRootToken), opts.Remote, opts.ReturnTo)
+               return arvados.LoginResponse{
+                       RedirectLocation: conf.AuthCodeURL(state.String(),
+                               // prompt=select_account tells Google
+                               // to show the "choose which Google
+                               // account" page, even if the client
+                               // is currently logged in to exactly
+                               // one Google account.
+                               oauth2.SetAuthURLParam("prompt", "select_account")),
+               }, nil
+       } else {
+               // Callback after Google sign-in.
+               state := ctrl.parseOAuth2State(opts.State)
+               if !state.verify([]byte(cluster.SystemRootToken)) {
+                       return ctrl.loginError(errors.New("invalid OAuth2 state"))
+               }
+               oauth2Token, err := conf.Exchange(ctx, opts.Code)
+               if err != nil {
+                       return ctrl.loginError(fmt.Errorf("error in OAuth2 exchange: %s", err))
+               }
+               rawIDToken, ok := oauth2Token.Extra("id_token").(string)
+               if !ok {
+                       return ctrl.loginError(errors.New("error in OAuth2 exchange: no ID token in OAuth2 token"))
+               }
+               idToken, err := verifier.Verify(ctx, rawIDToken)
+               if err != nil {
+                       return ctrl.loginError(fmt.Errorf("error verifying ID token: %s", err))
+               }
+               var claims struct {
+                       Name     string `json:"name"`
+                       Email    string `json:"email"`
+                       Verified bool   `json:"email_verified"`
+               }
+               if err := idToken.Claims(&claims); err != nil {
+                       return ctrl.loginError(fmt.Errorf("error extracting claims from ID token: %s", err))
+               }
+               if !claims.Verified {
+                       return ctrl.loginError(errors.New("cannot authenticate using an unverified email address"))
+               }
+
+               firstname, lastname := strings.TrimSpace(claims.Name), ""
+               if names := strings.Fields(firstname); len(names) > 1 {
+                       firstname = strings.Join(names[0:len(names)-1], " ")
+                       lastname = names[len(names)-1]
+               }
+
+               ctxRoot := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{cluster.SystemRootToken}})
+               return railsproxy.UserSessionCreate(ctxRoot, rpc.UserSessionCreateOptions{
+                       ReturnTo: state.Remote + "," + state.ReturnTo,
+                       AuthInfo: map[string]interface{}{
+                               "email":      claims.Email,
+                               "first_name": firstname,
+                               "last_name":  lastname,
+                       },
+               })
+       }
+}
+
+func (ctrl *googleLoginController) loginError(sendError error) (resp arvados.LoginResponse, err error) {
+       tmpl, err := template.New("error").Parse(`<h2>Login error:</h2><p>{{.}}</p>`)
+       if err != nil {
+               return
+       }
+       err = tmpl.Execute(&resp.HTML, sendError.Error())
+       return
+}
+
+func (ctrl *googleLoginController) newOAuth2State(key []byte, remote, returnTo string) oauth2State {
+       s := oauth2State{
+               Time:     time.Now().Unix(),
+               Remote:   remote,
+               ReturnTo: returnTo,
+       }
+       s.HMAC = s.computeHMAC(key)
+       return s
+}
+
+type oauth2State struct {
+       HMAC     []byte // hash of other fields; see computeHMAC()
+       Time     int64  // creation time (unix timestamp)
+       Remote   string // remote cluster if requesting a salted token, otherwise blank
+       ReturnTo string // redirect target
+}
+
+func (ctrl *googleLoginController) parseOAuth2State(encoded string) (s oauth2State) {
+       // Errors are not checked. If decoding/parsing fails, the
+       // token will be rejected by verify().
+       decoded, _ := base64.RawURLEncoding.DecodeString(encoded)
+       f := strings.Split(string(decoded), "\n")
+       if len(f) != 4 {
+               return
+       }
+       fmt.Sscanf(f[0], "%x", &s.HMAC)
+       fmt.Sscanf(f[1], "%x", &s.Time)
+       fmt.Sscanf(f[2], "%s", &s.Remote)
+       fmt.Sscanf(f[3], "%s", &s.ReturnTo)
+       return
+}
+
+func (s oauth2State) verify(key []byte) bool {
+       if delta := time.Now().Unix() - s.Time; delta < 0 || delta > 300 {
+               return false
+       }
+       return hmac.Equal(s.computeHMAC(key), s.HMAC)
+}
+
+func (s oauth2State) String() string {
+       var buf bytes.Buffer
+       enc := base64.NewEncoder(base64.RawURLEncoding, &buf)
+       fmt.Fprintf(enc, "%x\n%x\n%s\n%s", s.HMAC, s.Time, s.Remote, s.ReturnTo)
+       enc.Close()
+       return buf.String()
+}
+
+func (s oauth2State) computeHMAC(key []byte) []byte {
+       mac := hmac.New(sha256.New, key)
+       fmt.Fprintf(mac, "%x %s %s", s.Time, s.Remote, s.ReturnTo)
+       return mac.Sum(nil)
+}
diff --git a/lib/controller/localdb/login_test.go b/lib/controller/localdb/login_test.go
new file mode 100644 (file)
index 0000000..362e258
--- /dev/null
@@ -0,0 +1,259 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+       "bytes"
+       "context"
+       "crypto/rand"
+       "crypto/rsa"
+       "encoding/json"
+       "fmt"
+       "net/http"
+       "net/http/httptest"
+       "net/url"
+       "strings"
+       "testing"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/config"
+       "git.curoverse.com/arvados.git/lib/controller/rpc"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       check "gopkg.in/check.v1"
+       jose "gopkg.in/square/go-jose.v2"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&LoginSuite{})
+
+type LoginSuite struct {
+       cluster    *arvados.Cluster
+       ctx        context.Context
+       localdb    *Conn
+       railsSpy   *arvadostest.Proxy
+       fakeIssuer *httptest.Server
+       issuerKey  *rsa.PrivateKey
+
+       // expected token request
+       validCode string
+       // desired response from token endpoint
+       authEmail         string
+       authEmailVerified bool
+       authName          string
+}
+
+func (s *LoginSuite) SetUpTest(c *check.C) {
+       var err error
+       s.issuerKey, err = rsa.GenerateKey(rand.Reader, 2048)
+       c.Assert(err, check.IsNil)
+
+       s.authEmail = "active-user@arvados.local"
+       s.authEmailVerified = true
+       s.authName = "Fake User Name"
+       s.fakeIssuer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               req.ParseForm()
+               c.Logf("fakeIssuer: got req: %s %s %s", req.Method, req.URL, req.Form)
+               w.Header().Set("Content-Type", "application/json")
+               switch req.URL.Path {
+               case "/.well-known/openid-configuration":
+                       json.NewEncoder(w).Encode(map[string]interface{}{
+                               "issuer":                 s.fakeIssuer.URL,
+                               "authorization_endpoint": s.fakeIssuer.URL + "/auth",
+                               "token_endpoint":         s.fakeIssuer.URL + "/token",
+                               "jwks_uri":               s.fakeIssuer.URL + "/jwks",
+                               "userinfo_endpoint":      s.fakeIssuer.URL + "/userinfo",
+                       })
+               case "/token":
+                       if req.Form.Get("code") != s.validCode || s.validCode == "" {
+                               w.WriteHeader(http.StatusUnauthorized)
+                               return
+                       }
+                       idToken, _ := json.Marshal(map[string]interface{}{
+                               "iss":            s.fakeIssuer.URL,
+                               "aud":            []string{"test%client$id"},
+                               "sub":            "fake-user-id",
+                               "exp":            time.Now().UTC().Add(time.Minute).UnixNano(),
+                               "iat":            time.Now().UTC().UnixNano(),
+                               "nonce":          "fake-nonce",
+                               "email":          s.authEmail,
+                               "email_verified": s.authEmailVerified,
+                               "name":           s.authName,
+                       })
+                       json.NewEncoder(w).Encode(struct {
+                               AccessToken  string `json:"access_token"`
+                               TokenType    string `json:"token_type"`
+                               RefreshToken string `json:"refresh_token"`
+                               ExpiresIn    int32  `json:"expires_in"`
+                               IDToken      string `json:"id_token"`
+                       }{
+                               AccessToken:  s.fakeToken(c, []byte("fake access token")),
+                               TokenType:    "Bearer",
+                               RefreshToken: "test-refresh-token",
+                               ExpiresIn:    30,
+                               IDToken:      s.fakeToken(c, idToken),
+                       })
+               case "/jwks":
+                       json.NewEncoder(w).Encode(jose.JSONWebKeySet{
+                               Keys: []jose.JSONWebKey{
+                                       {Key: s.issuerKey.Public(), Algorithm: string(jose.RS256), KeyID: ""},
+                               },
+                       })
+               case "/auth":
+                       w.WriteHeader(http.StatusInternalServerError)
+               case "/userinfo":
+                       w.WriteHeader(http.StatusInternalServerError)
+               default:
+                       w.WriteHeader(http.StatusNotFound)
+               }
+       }))
+
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       s.cluster, err = cfg.GetCluster("")
+       s.cluster.Login.GoogleClientID = "test%client$id"
+       s.cluster.Login.GoogleClientSecret = "test#client/secret"
+       c.Assert(err, check.IsNil)
+
+       s.localdb = NewConn(s.cluster)
+       s.localdb.googleLoginController.issuer = s.fakeIssuer.URL
+
+       s.railsSpy = arvadostest.NewProxy(c, s.cluster.Services.RailsAPI)
+       s.localdb.railsProxy = rpc.NewConn(s.cluster.ClusterID, s.railsSpy.URL, true, rpc.PassthroughTokenProvider)
+}
+
+func (s *LoginSuite) TearDownTest(c *check.C) {
+       s.railsSpy.Close()
+}
+
+func (s *LoginSuite) TestGoogleLoginStart_Bogus(c *check.C) {
+       resp, err := s.localdb.Login(context.Background(), arvados.LoginOptions{})
+       c.Check(err, check.IsNil)
+       c.Check(resp.RedirectLocation, check.Equals, "")
+       c.Check(resp.HTML.String(), check.Matches, `.*missing return_to parameter.*`)
+}
+
+func (s *LoginSuite) TestGoogleLoginStart(c *check.C) {
+       for _, remote := range []string{"", "zzzzz"} {
+               resp, err := s.localdb.Login(context.Background(), arvados.LoginOptions{Remote: remote, ReturnTo: "https://app.example.com/foo?bar"})
+               c.Check(err, check.IsNil)
+               target, err := url.Parse(resp.RedirectLocation)
+               c.Check(err, check.IsNil)
+               issuerURL, _ := url.Parse(s.fakeIssuer.URL)
+               c.Check(target.Host, check.Equals, issuerURL.Host)
+               q := target.Query()
+               c.Check(q.Get("client_id"), check.Equals, "test%client$id")
+               state := s.localdb.googleLoginController.parseOAuth2State(q.Get("state"))
+               c.Check(state.verify([]byte(s.cluster.SystemRootToken)), check.Equals, true)
+               c.Check(state.Time, check.Not(check.Equals), 0)
+               c.Check(state.Remote, check.Equals, remote)
+               c.Check(state.ReturnTo, check.Equals, "https://app.example.com/foo?bar")
+       }
+}
+
+func (s *LoginSuite) TestGoogleLoginSuccess(c *check.C) {
+       // Initiate login, but instead of following the redirect to
+       // the provider, just grab state from the redirect URL.
+       resp, err := s.localdb.Login(context.Background(), arvados.LoginOptions{ReturnTo: "https://app.example.com/foo?bar"})
+       c.Check(err, check.IsNil)
+       target, err := url.Parse(resp.RedirectLocation)
+       c.Check(err, check.IsNil)
+       state := target.Query().Get("state")
+       c.Check(state, check.Not(check.Equals), "")
+
+       // Prime the fake issuer with a valid code.
+       s.validCode = fmt.Sprintf("abcdefgh-%d", time.Now().Unix())
+
+       // Callback with invalid code.
+       resp, err = s.localdb.Login(context.Background(), arvados.LoginOptions{
+               Code:  "first-try-a-bogus-code",
+               State: state,
+       })
+       c.Check(err, check.IsNil)
+       c.Check(resp.RedirectLocation, check.Equals, "")
+       c.Check(resp.HTML.String(), check.Matches, `(?ms).*error in OAuth2 exchange.*cannot fetch token.*`)
+
+       // Callback with invalid state.
+       resp, err = s.localdb.Login(context.Background(), arvados.LoginOptions{
+               Code:  s.validCode,
+               State: "bogus-state",
+       })
+       c.Check(err, check.IsNil)
+       c.Check(resp.RedirectLocation, check.Equals, "")
+       c.Check(resp.HTML.String(), check.Matches, `(?ms).*invalid OAuth2 state.*`)
+
+       // Callback with valid code and state.
+       resp, err = s.localdb.Login(context.Background(), arvados.LoginOptions{
+               Code:  s.validCode,
+               State: state,
+       })
+       c.Check(err, check.IsNil)
+       c.Check(resp.HTML.String(), check.Equals, "")
+       c.Check(resp.RedirectLocation, check.Not(check.Equals), "")
+       target, err = url.Parse(resp.RedirectLocation)
+       c.Check(err, check.IsNil)
+       c.Check(target.Host, check.Equals, "app.example.com")
+       c.Check(target.Path, check.Equals, "/foo")
+       token := target.Query().Get("api_token")
+       c.Check(token, check.Matches, `v2/zzzzz-gj3su-.{15}/.{32,50}`)
+
+       foundCallback := false
+       for _, dump := range s.railsSpy.RequestDumps {
+               c.Logf("spied request: %q", dump)
+               split := bytes.Split(dump, []byte("\r\n\r\n"))
+               c.Assert(split, check.HasLen, 2)
+               hdr, body := string(split[0]), string(split[1])
+               if strings.Contains(hdr, "POST /auth/controller/callback") {
+                       vs, err := url.ParseQuery(body)
+                       var authinfo map[string]interface{}
+                       c.Check(json.Unmarshal([]byte(vs.Get("auth_info")), &authinfo), check.IsNil)
+                       c.Check(err, check.IsNil)
+                       c.Check(authinfo["first_name"], check.Equals, "Fake User")
+                       c.Check(authinfo["last_name"], check.Equals, "Name")
+                       c.Check(authinfo["email"], check.Equals, "active-user@arvados.local")
+                       foundCallback = true
+               }
+       }
+       c.Check(foundCallback, check.Equals, true)
+
+       // Try using the returned Arvados token.
+       c.Logf("trying an API call with new token %q", token)
+       ctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{token}})
+       cl, err := s.localdb.CollectionList(ctx, arvados.ListOptions{Limit: -1})
+       c.Check(cl.ItemsAvailable, check.Not(check.Equals), 0)
+       c.Check(cl.Items, check.Not(check.HasLen), 0)
+       c.Check(err, check.IsNil)
+
+       // Might as well check that bogus tokens aren't accepted.
+       badtoken := token + "plussomeboguschars"
+       c.Logf("trying an API call with mangled token %q", badtoken)
+       ctx = auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{badtoken}})
+       cl, err = s.localdb.CollectionList(ctx, arvados.ListOptions{Limit: -1})
+       c.Check(cl.Items, check.HasLen, 0)
+       c.Check(err, check.NotNil)
+       c.Check(err, check.ErrorMatches, `.*401 Unauthorized: Not logged in.*`)
+}
+
+func (s *LoginSuite) fakeToken(c *check.C, payload []byte) string {
+       signer, err := jose.NewSigner(jose.SigningKey{Algorithm: jose.RS256, Key: s.issuerKey}, nil)
+       if err != nil {
+               c.Error(err)
+       }
+       object, err := signer.Sign(payload)
+       if err != nil {
+               c.Error(err)
+       }
+       t, err := object.CompactSerialize()
+       if err != nil {
+               c.Error(err)
+       }
+       c.Logf("fakeToken(%q) == %q", payload, t)
+       return t
+}
index 576e603eedd758f8ff53f2556e1161b6957b0691..54257cffc57d9a84eb946b73f3704607c1b9c45e 100644 (file)
@@ -7,15 +7,13 @@
 package railsproxy
 
 import (
-       "context"
-       "errors"
        "fmt"
+       "net/http"
        "net/url"
        "strings"
 
        "git.curoverse.com/arvados.git/lib/controller/rpc"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/auth"
 )
 
 // For now, FindRailsAPI always uses the rails API running on this
@@ -40,13 +38,10 @@ func NewConn(cluster *arvados.Cluster) *rpc.Conn {
        if err != nil {
                panic(err)
        }
-       return rpc.NewConn(cluster.ClusterID, url, insecure, provideIncomingToken)
-}
-
-func provideIncomingToken(ctx context.Context) ([]string, error) {
-       incoming, ok := auth.FromContext(ctx)
-       if !ok {
-               return nil, errors.New("no token provided")
-       }
-       return incoming.Tokens, nil
+       conn := rpc.NewConn(cluster.ClusterID, url, insecure, rpc.PassthroughTokenProvider)
+       // If Rails is running with force_ssl=true, this
+       // "X-Forwarded-Proto: https" header prevents it from
+       // redirecting our internal request to an invalid https URL.
+       conn.SendHeader = http.Header{"X-Forwarded-Proto": []string{"https"}}
+       return conn
 }
index aa3af1f64c45194c5d5b3cc2e1996ed212941e5e..e3ec37a6ea842ac36e6b8d766cf91cf0210f05f9 100644 (file)
@@ -52,9 +52,16 @@ func applySelectParam(selectParam []string, orig map[string]interface{}) map[str
        return selected
 }
 
-func (rtr *router) sendResponse(w http.ResponseWriter, resp interface{}, opts responseOptions) {
+func (rtr *router) sendResponse(w http.ResponseWriter, req *http.Request, resp interface{}, opts responseOptions) {
        var tmp map[string]interface{}
 
+       if resp, ok := resp.(http.Handler); ok {
+               // resp knows how to write its own http response
+               // header and body.
+               resp.ServeHTTP(w, req)
+               return
+       }
+
        err := rtr.transcode(resp, &tmp)
        if err != nil {
                rtr.sendError(w, err)
@@ -121,7 +128,9 @@ func (rtr *router) sendResponse(w http.ResponseWriter, resp interface{}, opts re
                }
        }
        w.Header().Set("Content-Type", "application/json")
-       json.NewEncoder(w).Encode(tmp)
+       enc := json.NewEncoder(w)
+       enc.SetEscapeHTML(false)
+       enc.Encode(tmp)
 }
 
 func (rtr *router) sendError(w http.ResponseWriter, err error) {
index 5d5602df523b672d6e8f6d84346ed5255a20ae76..d3bdce527211e1b26245a820dcbc9cd174f3dc62 100644 (file)
@@ -47,6 +47,13 @@ func (rtr *router) addRoutes() {
                                return rtr.fed.ConfigGet(ctx)
                        },
                },
+               {
+                       arvados.EndpointLogin,
+                       func() interface{} { return &arvados.LoginOptions{} },
+                       func(ctx context.Context, opts interface{}) (interface{}, error) {
+                               return rtr.fed.Login(ctx, *opts.(*arvados.LoginOptions))
+                       },
+               },
                {
                        arvados.EndpointCollectionCreate,
                        func() interface{} { return &arvados.CreateOptions{} },
@@ -263,7 +270,7 @@ func (rtr *router) addRoute(endpoint arvados.APIEndpoint, defaultOpts func() int
                        rtr.sendError(w, err)
                        return
                }
-               rtr.sendResponse(w, resp, respOpts)
+               rtr.sendResponse(w, req, resp, respOpts)
        })
 }
 
index 1028da829fbdb0361fc5b041e98fba5e30c81c6c..cb23c7fad1ea99dce585f3626b66bf9130b82069 100644 (file)
@@ -32,6 +32,7 @@ func PassthroughTokenProvider(ctx context.Context) ([]string, error) {
 }
 
 type Conn struct {
+       SendHeader    http.Header
        clusterID     string
        httpClient    http.Client
        baseURL       url.URL
@@ -61,8 +62,11 @@ func NewConn(clusterID string, url *url.URL, insecure bool, tp TokenProvider) *C
                }
        }
        return &Conn{
-               clusterID:     clusterID,
-               httpClient:    http.Client{Transport: transport},
+               clusterID: clusterID,
+               httpClient: http.Client{
+                       CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse },
+                       Transport:     transport,
+               },
                baseURL:       *url,
                tokenProvider: tp,
        }
@@ -70,9 +74,10 @@ func NewConn(clusterID string, url *url.URL, insecure bool, tp TokenProvider) *C
 
 func (conn *Conn) requestAndDecode(ctx context.Context, dst interface{}, ep arvados.APIEndpoint, body io.Reader, opts interface{}) error {
        aClient := arvados.Client{
-               Client:  &conn.httpClient,
-               Scheme:  conn.baseURL.Scheme,
-               APIHost: conn.baseURL.Host,
+               Client:     &conn.httpClient,
+               Scheme:     conn.baseURL.Scheme,
+               APIHost:    conn.baseURL.Host,
+               SendHeader: conn.SendHeader,
        }
        tokens, err := conn.tokenProvider(ctx)
        if err != nil {
@@ -121,6 +126,10 @@ func (conn *Conn) requestAndDecode(ctx context.Context, dst interface{}, ep arva
        return aClient.RequestAndDecodeContext(ctx, dst, ep.Method, path, body, params)
 }
 
+func (conn *Conn) BaseURL() url.URL {
+       return conn.baseURL
+}
+
 func (conn *Conn) ConfigGet(ctx context.Context) (json.RawMessage, error) {
        ep := arvados.EndpointConfigGet
        var resp json.RawMessage
@@ -128,6 +137,30 @@ func (conn *Conn) ConfigGet(ctx context.Context) (json.RawMessage, error) {
        return resp, err
 }
 
+func (conn *Conn) Login(ctx context.Context, options arvados.LoginOptions) (arvados.LoginResponse, error) {
+       ep := arvados.EndpointLogin
+       var resp arvados.LoginResponse
+       err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+       resp.RedirectLocation = conn.relativeToBaseURL(resp.RedirectLocation)
+       return resp, err
+}
+
+// If the given location is a valid URL and its origin is the same as
+// conn.baseURL, return it as a relative URL. Otherwise, return it
+// unmodified.
+func (conn *Conn) relativeToBaseURL(location string) string {
+       u, err := url.Parse(location)
+       if err == nil && u.Scheme == conn.baseURL.Scheme && strings.ToLower(u.Host) == strings.ToLower(conn.baseURL.Host) {
+               u.Opaque = ""
+               u.Scheme = ""
+               u.User = nil
+               u.Host = ""
+               return u.String()
+       } else {
+               return location
+       }
+}
+
 func (conn *Conn) CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error) {
        ep := arvados.EndpointCollectionCreate
        var resp arvados.Collection
@@ -281,3 +314,15 @@ func (conn *Conn) APIClientAuthorizationCurrent(ctx context.Context, options arv
        err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
        return resp, err
 }
+
+type UserSessionCreateOptions struct {
+       AuthInfo map[string]interface{} `json:"auth_info"`
+       ReturnTo string                 `json:"return_to"`
+}
+
+func (conn *Conn) UserSessionCreate(ctx context.Context, options UserSessionCreateOptions) (arvados.LoginResponse, error) {
+       ep := arvados.APIEndpoint{Method: "POST", Path: "auth/controller/callback"}
+       var resp arvados.LoginResponse
+       err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+       return resp, err
+}
index c30ff9f2b7608410bbcf3450c62708fa3e5e2d09..e819a6036b341d5d2bbe28a242292296b5b36cd2 100644 (file)
@@ -11,7 +11,6 @@ import (
        "syscall"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
 )
 
@@ -20,7 +19,7 @@ import (
 type remoteRunner struct {
        uuid          string
        executor      Executor
-       arvClient     *arvados.Client
+       envJSON       json.RawMessage
        remoteUser    string
        timeoutTERM   time.Duration
        timeoutSignal time.Duration
@@ -36,10 +35,30 @@ type remoteRunner struct {
 // newRemoteRunner returns a new remoteRunner. Caller should ensure
 // Close() is called to release resources.
 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+       // Send the instance type record as a JSON doc so crunch-run
+       // can log it.
+       var instJSON bytes.Buffer
+       enc := json.NewEncoder(&instJSON)
+       enc.SetIndent("", "    ")
+       if err := enc.Encode(wkr.instType); err != nil {
+               panic(err)
+       }
+       env := map[string]string{
+               "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
+               "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+               "InstanceType":      instJSON.String(),
+       }
+       if wkr.wp.arvClient.Insecure {
+               env["ARVADOS_API_HOST_INSECURE"] = "1"
+       }
+       envJSON, err := json.Marshal(env)
+       if err != nil {
+               panic(err)
+       }
        rr := &remoteRunner{
                uuid:          uuid,
                executor:      wkr.executor,
-               arvClient:     wkr.wp.arvClient,
+               envJSON:       envJSON,
                remoteUser:    wkr.instance.RemoteUser(),
                timeoutTERM:   wkr.wp.timeoutTERM,
                timeoutSignal: wkr.wp.timeoutSignal,
@@ -57,22 +76,11 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 // assume the remote process _might_ have started, at least until it
 // probes the worker and finds otherwise.
 func (rr *remoteRunner) Start() {
-       env := map[string]string{
-               "ARVADOS_API_HOST":  rr.arvClient.APIHost,
-               "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
-       }
-       if rr.arvClient.Insecure {
-               env["ARVADOS_API_HOST_INSECURE"] = "1"
-       }
-       envJSON, err := json.Marshal(env)
-       if err != nil {
-               panic(err)
-       }
-       stdin := bytes.NewBuffer(envJSON)
        cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
        if rr.remoteUser != "root" {
                cmd = "sudo " + cmd
        }
+       stdin := bytes.NewBuffer(rr.envJSON)
        stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
        if err != nil {
                rr.logger.WithField("stdout", string(stdout)).
index 4f9ba911cd3c112463f749738dcb70d1dfb47619..943fa7c710bb22af055838239a0eea2ecde11f93 100644 (file)
@@ -25,6 +25,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
        bootTimeout := time.Minute
        probeTimeout := time.Second
 
+       ac := arvados.NewClientFromEnv()
        is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
        c.Assert(err, check.IsNil)
        inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
@@ -192,6 +193,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        "crunch-run --list": trial.respRun,
                }
                wp := &Pool{
+                       arvClient:        ac,
                        newExecutor:      func(cloud.Instance) Executor { return exr },
                        bootProbeCommand: "bootprobe",
                        timeoutBooting:   bootTimeout,
index ccbce3ed3eaee5b05d7cf55f879cacfc0c87e2fb..3dd04040ab5d728b5eac21bab601225135ce810e 100644 (file)
@@ -36,7 +36,7 @@ from .perf import Perf
 from ._version import __version__
 from .executor import ArvCwlExecutor
 
-# These arn't used directly in this file but
+# These aren't used directly in this file but
 # other code expects to import them from here
 from .arvcontainer import ArvadosContainer
 from .arvtool import ArvadosCommandTool
index aa3388d00bc9964eb8eb845f2210ac6eee6510de..52ab2ca4bb5be9e3bca421349eea8bd2ffa28129 100644 (file)
@@ -425,7 +425,7 @@ class RunnerContainer(Runner):
                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
                 "API": True
             },
-            "use_existing": self.enable_reuse,
+            "use_existing": False, # Never reuse the runner container - see #15497.
             "properties": {}
         }
 
index d6ef665fb8fb3b89325ef022697c2f74cda05a0f..927e43ad76c1fe1e547fd91dc026282726c82a1c 100644 (file)
@@ -314,7 +314,7 @@ def stubs(func):
                 'vcpus': 1,
                 'ram': (1024+256)*1024*1024
             },
-            'use_existing': True,
+            'use_existing': False,
             'properties': {},
             'secret_mounts': {}
         }
@@ -690,7 +690,7 @@ class TestSubmit(unittest.TestCase):
                 'vcpus': 1,
                 'ram': 1342177280
             },
-            'use_existing': True,
+            'use_existing': False,
             'properties': {},
             'secret_mounts': {}
         }
@@ -785,7 +785,7 @@ class TestSubmit(unittest.TestCase):
                 'vcpus': 1,
                 'ram': 1342177280
             },
-            'use_existing': True,
+            'use_existing': False,
             'properties': {
                 "template_uuid": "962eh-7fd4e-gkbzl62qqtfig37"
             },
@@ -1180,7 +1180,7 @@ class TestSubmit(unittest.TestCase):
                 }
             },
             "state": "Committed",
-            "use_existing": True
+            "use_existing": False
         }
 
         stubs.api.container_requests().create.assert_called_with(
index 772f8da9719ae874d3f392782fb6388d3e74a488..5531cf71d344cbb795caaa2cac670d7a3ff88ca1 100644 (file)
@@ -18,6 +18,7 @@ type APIEndpoint struct {
 
 var (
        EndpointConfigGet                     = APIEndpoint{"GET", "arvados/v1/config", ""}
+       EndpointLogin                         = APIEndpoint{"GET", "login", ""}
        EndpointCollectionCreate              = APIEndpoint{"POST", "arvados/v1/collections", "collection"}
        EndpointCollectionUpdate              = APIEndpoint{"PATCH", "arvados/v1/collections/:uuid", "collection"}
        EndpointCollectionGet                 = APIEndpoint{"GET", "arvados/v1/collections/:uuid", ""}
@@ -83,8 +84,16 @@ type DeleteOptions struct {
        UUID string `json:"uuid"`
 }
 
+type LoginOptions struct {
+       ReturnTo string `json:"return_to"`        // On success, redirect to this target with api_token=xxx query param
+       Remote   string `json:"remote,omitempty"` // Salt token for remote Cluster ID
+       Code     string `json:"code,omitempty"`   // OAuth2 callback code
+       State    string `json:"state,omitempty"`  // OAuth2 callback state
+}
+
 type API interface {
        ConfigGet(ctx context.Context) (json.RawMessage, error)
+       Login(ctx context.Context, options LoginOptions) (LoginResponse, error)
        CollectionCreate(ctx context.Context, options CreateOptions) (Collection, error)
        CollectionUpdate(ctx context.Context, options UpdateOptions) (Collection, error)
        CollectionGet(ctx context.Context, options GetOptions) (Collection, error)
index a5815987b192a86c9ee646205bcc9ea0f7986dcc..8545cb969d92f8fbc716175d5c9820f47ed6680a 100644 (file)
@@ -54,6 +54,9 @@ type Client struct {
        // arvadosclient.ArvadosClient.)
        KeepServiceURIs []string `json:",omitempty"`
 
+       // HTTP headers to add/override in outgoing requests.
+       SendHeader http.Header
+
        dd *DiscoveryDocument
 
        ctx context.Context
@@ -144,9 +147,22 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        return c.httpClient().Do(req)
 }
 
+func isRedirectStatus(code int) bool {
+       switch code {
+       case http.StatusMovedPermanently, http.StatusFound, http.StatusSeeOther, http.StatusTemporaryRedirect, http.StatusPermanentRedirect:
+               return true
+       default:
+               return false
+       }
+}
+
 // DoAndDecode performs req and unmarshals the response (which must be
 // JSON) into dst. Use this instead of RequestAndDecode if you need
 // more control of the http.Request object.
+//
+// If the response status indicates an HTTP redirect, the Location
+// header value is unmarshalled to dst as a RedirectLocation
+// key/field.
 func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
        resp, err := c.Do(req)
        if err != nil {
@@ -157,13 +173,28 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
        if err != nil {
                return err
        }
-       if resp.StatusCode != 200 {
-               return newTransactionError(req, resp, buf)
-       }
-       if dst == nil {
+       switch {
+       case resp.StatusCode == http.StatusOK && dst == nil:
                return nil
+       case resp.StatusCode == http.StatusOK:
+               return json.Unmarshal(buf, dst)
+
+       // If the caller uses a client with a custom CheckRedirect
+       // func, Do() might return the 3xx response instead of
+       // following it.
+       case isRedirectStatus(resp.StatusCode) && dst == nil:
+               return nil
+       case isRedirectStatus(resp.StatusCode):
+               // Copy the redirect target URL to dst.RedirectLocation.
+               buf, err := json.Marshal(map[string]string{"RedirectLocation": resp.Header.Get("Location")})
+               if err != nil {
+                       return err
+               }
+               return json.Unmarshal(buf, dst)
+
+       default:
+               return newTransactionError(req, resp, buf)
        }
-       return json.Unmarshal(buf, dst)
 }
 
 // Convert an arbitrary struct to url.Values. For example,
@@ -268,6 +299,9 @@ func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, m
        }
        req = req.WithContext(ctx)
        req.Header.Set("Content-type", "application/x-www-form-urlencoded")
+       for k, v := range c.SendHeader {
+               req.Header[k] = v
+       }
        return c.DoAndDecode(dst, req)
 }
 
index 638c56fb95d16b28f57216dca86145b77353319c..6ec8f345d75f430ecd4fe8ffab4bfa665f569770 100644 (file)
@@ -132,8 +132,10 @@ type Cluster struct {
                Repositories string
        }
        Login struct {
-               ProviderAppSecret  string
+               GoogleClientID     string
+               GoogleClientSecret string
                ProviderAppID      string
+               ProviderAppSecret  string
                LoginCluster       string
                RemoteTokenRefresh Duration
        }
@@ -208,6 +210,7 @@ type Cluster struct {
                UserProfileFormMessage string
                VocabularyURL          string
                WelcomePageHTML        string
+               InactivePageHTML       string
        }
 
        EnableBetaController14287 bool
index 3058a7609c6665dfd22c2ee6bb205685c97afe7d..d06aba3695adc37f3d74057de2568778bcd9f9c9 100644 (file)
@@ -58,6 +58,9 @@ type FileSystem interface {
        // while locking multiple inodes.
        locker() sync.Locker
 
+       // throttle for limiting concurrent background writers
+       throttle() *throttle
+
        // create a new node with nil parent.
        newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
 
@@ -86,7 +89,19 @@ type FileSystem interface {
        Remove(name string) error
        RemoveAll(name string) error
        Rename(oldname, newname string) error
+
+       // Write buffered data from memory to storage, returning when
+       // all updates have been saved to persistent storage.
        Sync() error
+
+       // Write buffered data from memory to storage, but don't wait
+       // for all writes to finish before returning. If shortBlocks
+       // is true, flush everything; otherwise, if there's less than
+       // a full block of buffered data at the end of a stream, leave
+       // it buffered in memory in case more data can be appended. If
+       // path is "", flush all dirs/streams; otherwise, flush only
+       // the specified dir/stream.
+       Flush(path string, shortBlocks bool) error
 }
 
 type inode interface {
@@ -288,12 +303,17 @@ type fileSystem struct {
        root inode
        fsBackend
        mutex sync.Mutex
+       thr   *throttle
 }
 
 func (fs *fileSystem) rootnode() inode {
        return fs.root
 }
 
+func (fs *fileSystem) throttle() *throttle {
+       return fs.thr
+}
+
 func (fs *fileSystem) locker() sync.Locker {
        return &fs.mutex
 }
@@ -560,6 +580,11 @@ func (fs *fileSystem) Sync() error {
        return ErrInvalidOperation
 }
 
+func (fs *fileSystem) Flush(string, bool) error {
+       log.Printf("TODO: flush fileSystem")
+       return ErrInvalidOperation
+}
+
 // rlookup (recursive lookup) returns the inode for the file/directory
 // with the given name (which may contain "/" separators). If no such
 // file/directory exists, the returned node is nil.
index 972b3979fcfa4dd7fdb3cde62a90eacd37b27c56..b3e6aa96e46473a11a6a839aa1a3e35693b25cdb 100644 (file)
@@ -21,8 +21,7 @@ import (
 
 var (
        maxBlockSize      = 1 << 26
-       concurrentWriters = 4 // max goroutines writing to Keep during sync()
-       writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
+       concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
 )
 
 // A CollectionFileSystem is a FileSystem that can be serialized as a
@@ -38,6 +37,9 @@ type CollectionFileSystem interface {
 
        // Total data bytes in all files.
        Size() int64
+
+       // Memory consumed by buffered file data.
+       memorySize() int64
 }
 
 type collectionFileSystem struct {
@@ -57,6 +59,7 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
                uuid: c.UUID,
                fileSystem: fileSystem{
                        fsBackend: keepBackend{apiClient: client, keepClient: kc},
+                       thr:       newThrottle(concurrentWriters),
                },
        }
        root := &dirnode{
@@ -143,10 +146,48 @@ func (fs *collectionFileSystem) Sync() error {
        return nil
 }
 
+func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
+       node, err := rlookup(fs.fileSystem.root, path)
+       if err != nil {
+               return err
+       }
+       dn, ok := node.(*dirnode)
+       if !ok {
+               return ErrNotADirectory
+       }
+       dn.Lock()
+       defer dn.Unlock()
+       names := dn.sortedNames()
+       if path != "" {
+               // Caller only wants to flush the specified dir,
+               // non-recursively.  Drop subdirs from the list of
+               // names.
+               var filenames []string
+               for _, name := range names {
+                       if _, ok := dn.inodes[name].(*filenode); ok {
+                               filenames = append(filenames, name)
+                       }
+               }
+               names = filenames
+       }
+       for _, name := range names {
+               child := dn.inodes[name]
+               child.Lock()
+               defer child.Unlock()
+       }
+       return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
+}
+
+func (fs *collectionFileSystem) memorySize() int64 {
+       fs.fileSystem.root.Lock()
+       defer fs.fileSystem.root.Unlock()
+       return fs.fileSystem.root.(*dirnode).memorySize()
+}
+
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
        fs.fileSystem.root.Lock()
        defer fs.fileSystem.root.Unlock()
-       return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
+       return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
 }
 
 func (fs *collectionFileSystem) Size() int64 {
@@ -238,7 +279,6 @@ type filenode struct {
        memsize  int64 // bytes in memSegments
        sync.RWMutex
        nullnode
-       throttle *throttle
 }
 
 // caller must have lock
@@ -501,12 +541,8 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 // Write some data out to disk to reduce memory use. Caller must have
 // write lock.
 func (fn *filenode) pruneMemSegments() {
-       // TODO: share code with (*dirnode)sync()
+       // TODO: share code with (*dirnode)flush()
        // TODO: pack/flush small blocks too, when fragmented
-       if fn.throttle == nil {
-               // TODO: share a throttle with filesystem
-               fn.throttle = newThrottle(writeAheadBlocks)
-       }
        for idx, seg := range fn.segments {
                seg, ok := seg.(*memSegment)
                if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
@@ -522,14 +558,14 @@ func (fn *filenode) pruneMemSegments() {
                // progress, block here until one finishes, rather
                // than pile up an unlimited number of buffered writes
                // and network flush operations.
-               fn.throttle.Acquire()
+               fn.fs.throttle().Acquire()
                go func() {
                        defer close(done)
                        locator, _, err := fn.FS().PutB(buf)
-                       fn.throttle.Release()
+                       fn.fs.throttle().Release()
                        fn.Lock()
                        defer fn.Unlock()
-                       if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
+                       if seg.flushing != done {
                                // A new seg.buf has been allocated.
                                return
                        }
@@ -556,8 +592,8 @@ func (fn *filenode) pruneMemSegments() {
        }
 }
 
-// Block until all pending pruneMemSegments work is finished. Caller
-// must NOT have lock.
+// Block until all pending pruneMemSegments/flush work is
+// finished. Caller must NOT have lock.
 func (fn *filenode) waitPrune() {
        var pending []<-chan struct{}
        fn.Lock()
@@ -613,51 +649,141 @@ type fnSegmentRef struct {
 // storedSegments that reference the relevant portions of the new
 // block.
 //
+// bufsize is the total data size in refs. It is used to preallocate
+// the correct amount of memory when len(refs)>1.
+//
+// If sync is false, commitBlock returns right away, after starting a
+// goroutine to do the writes, reacquire the filenodes' locks, and
+// swap out the *memSegments. Some filenodes' segments might get
+// modified/rearranged in the meantime, in which case commitBlock
+// won't replace them.
+//
 // Caller must have write lock.
-func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
        if len(refs) == 0 {
                return nil
        }
-       throttle.Acquire()
-       defer throttle.Release()
        if err := ctx.Err(); err != nil {
                return err
        }
-       block := make([]byte, 0, maxBlockSize)
+       done := make(chan struct{})
+       var block []byte
+       segs := make([]*memSegment, 0, len(refs))
+       offsets := make([]int, 0, len(refs)) // location of segment's data within block
        for _, ref := range refs {
-               block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
-       }
-       locator, _, err := dn.fs.PutB(block)
-       if err != nil {
-               return err
+               seg := ref.fn.segments[ref.idx].(*memSegment)
+               if seg.flushing != nil && !sync {
+                       // Let the other flushing goroutine finish. If
+                       // it fails, we'll try again next time.
+                       return nil
+               } else {
+                       // In sync mode, we proceed regardless of
+                       // whether another flush is in progress: It
+                       // can't finish before we do, because we hold
+                       // fn's lock until we finish our own writes.
+               }
+               seg.flushing = done
+               offsets = append(offsets, len(block))
+               if len(refs) == 1 {
+                       block = seg.buf
+               } else if block == nil {
+                       block = append(make([]byte, 0, bufsize), seg.buf...)
+               } else {
+                       block = append(block, seg.buf...)
+               }
+               segs = append(segs, seg)
        }
-       off := 0
-       for _, ref := range refs {
-               data := ref.fn.segments[ref.idx].(*memSegment).buf
-               ref.fn.segments[ref.idx] = storedSegment{
-                       kc:      dn.fs,
-                       locator: locator,
-                       size:    len(block),
-                       offset:  off,
-                       length:  len(data),
+       dn.fs.throttle().Acquire()
+       errs := make(chan error, 1)
+       go func() {
+               defer close(done)
+               defer close(errs)
+               locked := map[*filenode]bool{}
+               locator, _, err := dn.fs.PutB(block)
+               dn.fs.throttle().Release()
+               {
+                       if !sync {
+                               for _, name := range dn.sortedNames() {
+                                       if fn, ok := dn.inodes[name].(*filenode); ok {
+                                               fn.Lock()
+                                               defer fn.Unlock()
+                                               locked[fn] = true
+                                       }
+                               }
+                       }
+                       defer func() {
+                               for _, seg := range segs {
+                                       if seg.flushing == done {
+                                               seg.flushing = nil
+                                       }
+                               }
+                       }()
+               }
+               if err != nil {
+                       errs <- err
+                       return
                }
-               off += len(data)
-               ref.fn.memsize -= int64(len(data))
+               for idx, ref := range refs {
+                       if !sync {
+                               // In async mode, fn's lock was
+                               // released while we were waiting for
+                               // PutB(); lots of things might have
+                               // changed.
+                               if len(ref.fn.segments) <= ref.idx {
+                                       // file segments have
+                                       // rearranged or changed in
+                                       // some way
+                                       continue
+                               } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
+                                       // segment has been replaced
+                                       continue
+                               } else if seg.flushing != done {
+                                       // seg.buf has been replaced
+                                       continue
+                               } else if !locked[ref.fn] {
+                                       // file was renamed, moved, or
+                                       // deleted since we called
+                                       // PutB
+                                       continue
+                               }
+                       }
+                       data := ref.fn.segments[ref.idx].(*memSegment).buf
+                       ref.fn.segments[ref.idx] = storedSegment{
+                               kc:      dn.fs,
+                               locator: locator,
+                               size:    len(block),
+                               offset:  offsets[idx],
+                               length:  len(data),
+                       }
+                       ref.fn.memsize -= int64(len(data))
+               }
+       }()
+       if sync {
+               return <-errs
+       } else {
+               return nil
        }
-       return nil
 }
 
-// sync flushes in-memory data and remote block references (for the
+type flushOpts struct {
+       sync        bool
+       shortBlocks bool
+}
+
+// flush in-memory data and remote-cluster block references (for the
 // children with the given names, which must be children of dn) to
-// local persistent storage. Caller must have write lock on dn and the
-// named children.
-func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
+// local-cluster persistent storage.
+//
+// Caller must have write lock on dn and the named children.
+//
+// If any children are dirs, they will be flushed recursively.
+func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
        cg := newContextGroup(ctx)
        defer cg.Cancel()
 
-       goCommit := func(refs []fnSegmentRef) {
+       goCommit := func(refs []fnSegmentRef, bufsize int) {
                cg.Go(func() error {
-                       return dn.commitBlock(cg.Context(), throttle, refs)
+                       return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
                })
        }
 
@@ -665,47 +791,87 @@ func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string)
        var pendingLen int = 0
        localLocator := map[string]string{}
        for _, name := range names {
-               fn, ok := dn.inodes[name].(*filenode)
-               if !ok {
-                       continue
-               }
-               for idx, seg := range fn.segments {
-                       switch seg := seg.(type) {
-                       case storedSegment:
-                               loc, ok := localLocator[seg.locator]
-                               if !ok {
-                                       var err error
-                                       loc, err = dn.fs.LocalLocator(seg.locator)
-                                       if err != nil {
-                                               return err
+               switch node := dn.inodes[name].(type) {
+               case *dirnode:
+                       grandchildNames := node.sortedNames()
+                       for _, grandchildName := range grandchildNames {
+                               grandchild := node.inodes[grandchildName]
+                               grandchild.Lock()
+                               defer grandchild.Unlock()
+                       }
+                       cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
+               case *filenode:
+                       for idx, seg := range node.segments {
+                               switch seg := seg.(type) {
+                               case storedSegment:
+                                       loc, ok := localLocator[seg.locator]
+                                       if !ok {
+                                               var err error
+                                               loc, err = dn.fs.LocalLocator(seg.locator)
+                                               if err != nil {
+                                                       return err
+                                               }
+                                               localLocator[seg.locator] = loc
                                        }
-                                       localLocator[seg.locator] = loc
-                               }
-                               seg.locator = loc
-                               fn.segments[idx] = seg
-                       case *memSegment:
-                               if seg.Len() > maxBlockSize/2 {
-                                       goCommit([]fnSegmentRef{{fn, idx}})
-                                       continue
-                               }
-                               if pendingLen+seg.Len() > maxBlockSize {
-                                       goCommit(pending)
-                                       pending = nil
-                                       pendingLen = 0
+                                       seg.locator = loc
+                                       node.segments[idx] = seg
+                               case *memSegment:
+                                       if seg.Len() > maxBlockSize/2 {
+                                               goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
+                                               continue
+                                       }
+                                       if pendingLen+seg.Len() > maxBlockSize {
+                                               goCommit(pending, pendingLen)
+                                               pending = nil
+                                               pendingLen = 0
+                                       }
+                                       pending = append(pending, fnSegmentRef{node, idx})
+                                       pendingLen += seg.Len()
+                               default:
+                                       panic(fmt.Sprintf("can't sync segment type %T", seg))
                                }
-                               pending = append(pending, fnSegmentRef{fn, idx})
-                               pendingLen += seg.Len()
-                       default:
-                               panic(fmt.Sprintf("can't sync segment type %T", seg))
                        }
                }
        }
-       goCommit(pending)
+       if opts.shortBlocks {
+               goCommit(pending, pendingLen)
+       }
        return cg.Wait()
 }
 
 // caller must have write lock.
-func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+func (dn *dirnode) memorySize() (size int64) {
+       for _, name := range dn.sortedNames() {
+               node := dn.inodes[name]
+               node.Lock()
+               defer node.Unlock()
+               switch node := node.(type) {
+               case *dirnode:
+                       size += node.memorySize()
+               case *filenode:
+                       for _, seg := range node.segments {
+                               switch seg := seg.(type) {
+                               case *memSegment:
+                                       size += int64(seg.Len())
+                               }
+                       }
+               }
+       }
+       return
+}
+
+// caller must have write lock.
+func (dn *dirnode) sortedNames() []string {
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+       return names
+}
+
+// caller must have write lock.
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
        cg := newContextGroup(ctx)
        defer cg.Cancel()
 
@@ -720,11 +886,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
                return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
        }
 
-       names := make([]string, 0, len(dn.inodes))
-       for name := range dn.inodes {
-               names = append(names, name)
-       }
-       sort.Strings(names)
+       names := dn.sortedNames()
 
        // Wait for children to finish any pending write operations
        // before locking them.
@@ -756,7 +918,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
        for i, name := range dirnames {
                i, name := i, name
                cg.Go(func() error {
-                       txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
+                       txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
                        subdirs[i] = txt
                        return err
                })
@@ -772,7 +934,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
 
                var fileparts []filepart
                var blocks []string
-               if err := dn.sync(cg.Context(), throttle, names); err != nil {
+               if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
                        return err
                }
                for _, name := range filenames {
@@ -805,7 +967,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
                                default:
                                        // This can't happen: we
                                        // haven't unlocked since
-                                       // calling sync().
+                                       // calling flush(sync=true).
                                        panic(fmt.Sprintf("can't marshal segment type %T", seg))
                                }
                        }
index 7fd03b120a7f34240393f884f88992b885499e1f..352b226bf1f63bfbf1ac94b211de57d3ec25f581 100644 (file)
@@ -535,7 +535,7 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
        }
 
        maxBlockSize = 8
-       defer func() { maxBlockSize = 2 << 26 }()
+       defer func() { maxBlockSize = 1 << 26 }()
 
        var wg sync.WaitGroup
        for n := 0; n < 128; n++ {
@@ -1039,12 +1039,12 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
        c.Check(err, check.ErrorMatches, `invalid flag.*`)
 }
 
-func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
-       defer func(wab, mbs int) {
-               writeAheadBlocks = wab
+func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
+       defer func(cw, mbs int) {
+               concurrentWriters = cw
                maxBlockSize = mbs
-       }(writeAheadBlocks, maxBlockSize)
-       writeAheadBlocks = 2
+       }(concurrentWriters, maxBlockSize)
+       concurrentWriters = 2
        maxBlockSize = 1024
 
        proceed := make(chan struct{})
@@ -1069,7 +1069,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
                default:
                        time.Sleep(time.Millisecond)
                }
-               c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+               c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
        }
 
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
@@ -1105,6 +1105,181 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
        c.Check(currentMemExtents(), check.HasLen, 0)
 }
 
+// Ensure blocks get flushed to disk if a lot of data is written to
+// small files/directories without calling sync().
+//
+// Write four 512KiB files into each of 256 top-level dirs (total
+// 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
+// exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
+// 2MiB).
+func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       s.kc.onPut = func([]byte) {
+               // discard flushed data -- otherwise the stub will use
+               // unlimited memory
+               time.Sleep(time.Millisecond)
+               s.kc.Lock()
+               defer s.kc.Unlock()
+               s.kc.blocks = map[string][]byte{}
+       }
+       for i := 0; i < 256; i++ {
+               buf := bytes.NewBuffer(make([]byte, 524288))
+               fmt.Fprintf(buf, "test file in dir%d", i)
+
+               dir := fmt.Sprintf("dir%d", i)
+               fs.Mkdir(dir, 0755)
+               for j := 0; j < 2; j++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       _, err = io.Copy(f, buf)
+                       c.Assert(err, check.IsNil)
+               }
+
+               if i%8 == 0 {
+                       fs.Flush("", true)
+               }
+
+               size := fs.memorySize()
+               if !c.Check(size <= 1<<24, check.Equals, true) {
+                       c.Logf("at dir%d fs.memorySize()=%d", i, size)
+                       return
+               }
+       }
+}
+
+// Ensure short blocks at the end of a stream don't get flushed by
+// Flush(false).
+//
+// Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
+// blocks have been flushed while 8x 3MiB is still buffered in memory.
+func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       var flushed int64
+       s.kc.onPut = func(p []byte) {
+               atomic.AddInt64(&flushed, int64(len(p)))
+       }
+
+       nDirs := int64(8)
+       megabyte := make([]byte, 1<<20)
+       for i := int64(0); i < nDirs; i++ {
+               dir := fmt.Sprintf("dir%d", i)
+               fs.Mkdir(dir, 0755)
+               for j := 0; j < 67; j++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       _, err = f.Write(megabyte)
+                       c.Assert(err, check.IsNil)
+               }
+       }
+       c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+       c.Check(flushed, check.Equals, int64(0))
+
+       waitForFlush := func(expectUnflushed, expectFlushed int64) {
+               for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+               }
+               c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+               c.Check(flushed, check.Equals, expectFlushed)
+       }
+
+       // Nothing flushed yet
+       waitForFlush((nDirs*67)<<20, 0)
+
+       // Flushing a non-empty dir "/" is non-recursive and there are
+       // no top-level files, so this has no effect
+       fs.Flush("/", false)
+       waitForFlush((nDirs*67)<<20, 0)
+
+       // Flush the full block in dir0
+       fs.Flush("dir0", false)
+       waitForFlush((nDirs*67-64)<<20, 64<<20)
+
+       err = fs.Flush("dir-does-not-exist", false)
+       c.Check(err, check.NotNil)
+
+       // Flush full blocks in all dirs
+       fs.Flush("", false)
+       waitForFlush(nDirs*3<<20, nDirs*64<<20)
+
+       // Flush non-full blocks, too
+       fs.Flush("", true)
+       waitForFlush(0, nDirs*67<<20)
+}
+
+// Even when writing lots of files/dirs from different goroutines, as
+// long as Flush(dir,false) is called after writing each file,
+// unflushed data should be limited to one full block per
+// concurrentWriter, plus one nearly-full block at the end of each
+// dir/stream.
+func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
+       nDirs := int64(8)
+       maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
+
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       release := make(chan struct{})
+       timeout := make(chan struct{})
+       time.AfterFunc(10*time.Second, func() { close(timeout) })
+       var putCount, concurrency int64
+       var unflushed int64
+       s.kc.onPut = func(p []byte) {
+               defer atomic.AddInt64(&unflushed, -int64(len(p)))
+               cur := atomic.AddInt64(&concurrency, 1)
+               defer atomic.AddInt64(&concurrency, -1)
+               pc := atomic.AddInt64(&putCount, 1)
+               if pc < int64(concurrentWriters) {
+                       // Block until we reach concurrentWriters, to
+                       // make sure we're really accepting concurrent
+                       // writes.
+                       select {
+                       case <-release:
+                       case <-timeout:
+                               c.Error("timeout")
+                       }
+               } else if pc == int64(concurrentWriters) {
+                       // Unblock the first N-1 PUT reqs.
+                       close(release)
+               }
+               c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
+               c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
+       }
+
+       var owg sync.WaitGroup
+       megabyte := make([]byte, 1<<20)
+       for i := int64(0); i < nDirs; i++ {
+               dir := fmt.Sprintf("dir%d", i)
+               fs.Mkdir(dir, 0755)
+               owg.Add(1)
+               go func() {
+                       defer owg.Done()
+                       defer fs.Flush(dir, true)
+                       var iwg sync.WaitGroup
+                       defer iwg.Wait()
+                       for j := 0; j < 67; j++ {
+                               iwg.Add(1)
+                               go func(j int) {
+                                       defer iwg.Done()
+                                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+                                       c.Assert(err, check.IsNil)
+                                       defer f.Close()
+                                       n, err := f.Write(megabyte)
+                                       c.Assert(err, check.IsNil)
+                                       atomic.AddInt64(&unflushed, int64(n))
+                                       fs.Flush(dir, false)
+                               }(j)
+                       }
+               }()
+       }
+       owg.Wait()
+       fs.Flush("", true)
+}
+
 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
        for _, txt := range []string{
                "\n",
index 82114e2ea9ed54ac89cc94dd387d73711efae561..4264be4fa600be7abb05351f506feb00dccac6cc 100644 (file)
@@ -21,6 +21,7 @@ type CustomFileSystem interface {
 type customFileSystem struct {
        fileSystem
        root *vdirnode
+       thr  *throttle
 
        staleThreshold time.Time
        staleLock      sync.Mutex
@@ -33,6 +34,7 @@ func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
                fileSystem: fileSystem{
                        fsBackend: keepBackend{apiClient: c, keepClient: kc},
                        root:      root,
+                       thr:       newThrottle(concurrentWriters),
                },
        }
        root.inode = &treenode{
diff --git a/sdk/go/arvados/login.go b/sdk/go/arvados/login.go
new file mode 100644 (file)
index 0000000..8c51546
--- /dev/null
@@ -0,0 +1,24 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+       "bytes"
+       "net/http"
+)
+
+type LoginResponse struct {
+       RedirectLocation string
+       HTML             bytes.Buffer
+}
+
+func (resp LoginResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+       if resp.RedirectLocation != "" {
+               w.Header().Set("Location", resp.RedirectLocation)
+               w.WriteHeader(http.StatusFound)
+       } else {
+               w.Write(resp.HTML.Bytes())
+       }
+}
index 850bd0639dcaa856c5b6dfa69b09309a1ead1de5..24e9f190865b0456a8be431449239e2ec3aba6db 100644 (file)
@@ -8,6 +8,7 @@ import (
        "context"
        "encoding/json"
        "errors"
+       "net/url"
        "reflect"
        "runtime"
        "sync"
@@ -24,10 +25,18 @@ type APIStub struct {
        mtx   sync.Mutex
 }
 
+// BaseURL implements federation.backend
+func (as *APIStub) BaseURL() url.URL {
+       return url.URL{Scheme: "https", Host: "apistub.example.com"}
+}
 func (as *APIStub) ConfigGet(ctx context.Context) (json.RawMessage, error) {
        as.appendCall(as.ConfigGet, ctx, nil)
        return nil, as.Error
 }
+func (as *APIStub) Login(ctx context.Context, options arvados.LoginOptions) (arvados.LoginResponse, error) {
+       as.appendCall(as.Login, ctx, options)
+       return arvados.LoginResponse{}, as.Error
+}
 func (as *APIStub) CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error) {
        as.appendCall(as.CollectionCreate, ctx, options)
        return arvados.Collection{}, as.Error
diff --git a/sdk/go/arvadostest/proxy.go b/sdk/go/arvadostest/proxy.go
new file mode 100644 (file)
index 0000000..015061a
--- /dev/null
@@ -0,0 +1,72 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvadostest
+
+import (
+       "crypto/tls"
+       "net"
+       "net/http"
+       "net/http/httptest"
+       "net/http/httputil"
+       "net/url"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "gopkg.in/check.v1"
+)
+
+type Proxy struct {
+       *httptest.Server
+
+       // URL where the proxy is listening. Same as Server.URL, but
+       // with parsing already done for you.
+       URL *url.URL
+
+       // A dump of each request that has been proxied.
+       RequestDumps [][]byte
+}
+
+// NewProxy returns a new Proxy that saves a dump of each reqeust
+// before forwarding to the indicated service.
+func NewProxy(c *check.C, svc arvados.Service) *Proxy {
+       var target url.URL
+       c.Assert(svc.InternalURLs, check.HasLen, 1)
+       for u := range svc.InternalURLs {
+               target = url.URL(u)
+               break
+       }
+       rp := httputil.NewSingleHostReverseProxy(&target)
+       rp.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
+               dump, _ := httputil.DumpRequest(r, false)
+               c.Logf("arvadostest.Proxy ErrorHandler(%s): %s\n%s", r.URL, err, dump)
+               http.Error(w, err.Error(), http.StatusBadGateway)
+       }
+       rp.Transport = &http.Transport{
+               DialContext: (&net.Dialer{
+                       Timeout:   30 * time.Second,
+                       KeepAlive: 30 * time.Second,
+                       DualStack: true,
+               }).DialContext,
+               MaxIdleConns:          100,
+               IdleConnTimeout:       90 * time.Second,
+               TLSHandshakeTimeout:   10 * time.Second,
+               ExpectContinueTimeout: 1 * time.Second,
+               TLSClientConfig:       &tls.Config{InsecureSkipVerify: true},
+       }
+       srv := httptest.NewServer(rp)
+       u, err := url.Parse(srv.URL)
+       c.Assert(err, check.IsNil)
+       proxy := &Proxy{
+               Server: srv,
+               URL:    u,
+       }
+       rp.Director = func(r *http.Request) {
+               dump, _ := httputil.DumpRequest(r, true)
+               proxy.RequestDumps = append(proxy.RequestDumps, dump)
+               r.URL.Scheme = target.Scheme
+               r.URL.Host = target.Host
+       }
+       return proxy
+}
index f64708454c2b1e12cb5a75906d7d43676629cfb5..50191eb3f0b04f36faedabf8f023a344a42b264e 100644 (file)
@@ -78,11 +78,15 @@ func logResponse(w *responseTimer, req *http.Request, lgr *logrus.Entry) {
        if respCode == 0 {
                respCode = http.StatusOK
        }
-       lgr.WithFields(logrus.Fields{
+       fields := logrus.Fields{
                "respStatusCode": respCode,
                "respStatus":     http.StatusText(respCode),
                "respBytes":      w.WroteBodyBytes(),
-       }).Info("response")
+       }
+       if respCode >= 400 {
+               fields["respBody"] = string(w.Sniffed())
+       }
+       lgr.WithFields(fields).Info("response")
 }
 
 type responseTimer struct {
index 3b2bc7758069b44345b3da522b8f80cc303c52fe..eb71fcd814fdaf87262553f47731a8c2a555ba0e 100644 (file)
@@ -8,6 +8,7 @@ import (
        "bytes"
        "context"
        "encoding/json"
+       "fmt"
        "net/http"
        "net/http/httptest"
        "testing"
@@ -24,17 +25,23 @@ func Test(t *testing.T) {
 
 var _ = check.Suite(&Suite{})
 
-type Suite struct{}
+type Suite struct {
+       ctx     context.Context
+       log     *logrus.Logger
+       logdata *bytes.Buffer
+}
 
-func (s *Suite) TestLogRequests(c *check.C) {
-       captured := &bytes.Buffer{}
-       log := logrus.New()
-       log.Out = captured
-       log.Formatter = &logrus.JSONFormatter{
+func (s *Suite) SetUpTest(c *check.C) {
+       s.logdata = bytes.NewBuffer(nil)
+       s.log = logrus.New()
+       s.log.Out = s.logdata
+       s.log.Formatter = &logrus.JSONFormatter{
                TimestampFormat: time.RFC3339Nano,
        }
-       ctx := ctxlog.Context(context.Background(), log)
+       s.ctx = ctxlog.Context(context.Background(), s.log)
+}
 
+func (s *Suite) TestLogRequests(c *check.C) {
        h := AddRequestIDs(LogRequests(
                http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
                        w.Write([]byte("hello world"))
@@ -45,9 +52,9 @@ func (s *Suite) TestLogRequests(c *check.C) {
        c.Assert(err, check.IsNil)
        resp := httptest.NewRecorder()
 
-       HandlerWithContext(ctx, h).ServeHTTP(resp, req)
+       HandlerWithContext(s.ctx, h).ServeHTTP(resp, req)
 
-       dec := json.NewDecoder(captured)
+       dec := json.NewDecoder(s.logdata)
 
        gotReq := make(map[string]interface{})
        err = dec.Decode(&gotReq)
@@ -72,3 +79,46 @@ func (s *Suite) TestLogRequests(c *check.C) {
                c.Check(gotResp[key].(float64), check.Not(check.Equals), float64(0))
        }
 }
+
+func (s *Suite) TestLogErrorBody(c *check.C) {
+       dec := json.NewDecoder(s.logdata)
+
+       for _, trial := range []struct {
+               label      string
+               statusCode int
+               sentBody   string
+               expectLog  bool
+               expectBody string
+       }{
+               {"ok", 200, "hello world", false, ""},
+               {"redir", 302, "<a href='http://foo.example/baz'>redir</a>", false, ""},
+               {"4xx short body", 400, "oops", true, "oops"},
+               {"4xx long body", 400, fmt.Sprintf("%0*d", sniffBytes*2, 1), true, fmt.Sprintf("%0*d", sniffBytes, 0)},
+               {"5xx empty body", 500, "", true, ""},
+       } {
+               comment := check.Commentf("in trial: %q", trial.label)
+
+               req, err := http.NewRequest("GET", "https://foo.example/bar", nil)
+               c.Assert(err, check.IsNil)
+               resp := httptest.NewRecorder()
+
+               HandlerWithContext(s.ctx, LogRequests(
+                       http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+                               w.WriteHeader(trial.statusCode)
+                               w.Write([]byte(trial.sentBody))
+                       }),
+               )).ServeHTTP(resp, req)
+
+               gotReq := make(map[string]interface{})
+               err = dec.Decode(&gotReq)
+               c.Logf("%#v", gotReq)
+               gotResp := make(map[string]interface{})
+               err = dec.Decode(&gotResp)
+               c.Logf("%#v", gotResp)
+               if trial.expectLog {
+                       c.Check(gotResp["respBody"], check.Equals, trial.expectBody, comment)
+               } else {
+                       c.Check(gotResp["respBody"], check.IsNil, comment)
+               }
+       }
+}
index 8dea759ccb9b1772b816ad565a279975ab751c8a..049a3f1aae184de0ef522656741494db42764802 100644 (file)
@@ -8,10 +8,13 @@ import (
        "net/http"
 )
 
+const sniffBytes = 1024
+
 type ResponseWriter interface {
        http.ResponseWriter
        WroteStatus() int
        WroteBodyBytes() int
+       Sniffed() []byte
 }
 
 // responseWriter wraps http.ResponseWriter and exposes the status
@@ -19,9 +22,10 @@ type ResponseWriter interface {
 // error.
 type responseWriter struct {
        http.ResponseWriter
-       wroteStatus    int   // Last status given to WriteHeader()
+       wroteStatus    int   // First status given to WriteHeader()
        wroteBodyBytes int   // Bytes successfully written
        err            error // Last error returned from Write()
+       sniffed        []byte
 }
 
 func WrapResponseWriter(orig http.ResponseWriter) ResponseWriter {
@@ -36,13 +40,20 @@ func (w *responseWriter) CloseNotify() <-chan bool {
 }
 
 func (w *responseWriter) WriteHeader(s int) {
-       w.wroteStatus = s
+       if w.wroteStatus == 0 {
+               w.wroteStatus = s
+       }
+       // ...else it's too late to change the status seen by the
+       // client -- but we call the wrapped WriteHeader() anyway so
+       // it can log a warning.
        w.ResponseWriter.WriteHeader(s)
 }
 
 func (w *responseWriter) Write(data []byte) (n int, err error) {
        if w.wroteStatus == 0 {
                w.WriteHeader(http.StatusOK)
+       } else if w.wroteStatus >= 400 {
+               w.sniff(data)
        }
        n, err = w.ResponseWriter.Write(data)
        w.wroteBodyBytes += n
@@ -61,3 +72,17 @@ func (w *responseWriter) WroteBodyBytes() int {
 func (w *responseWriter) Err() error {
        return w.err
 }
+
+func (w *responseWriter) sniff(data []byte) {
+       max := sniffBytes - len(w.sniffed)
+       if max <= 0 {
+               return
+       } else if max < len(data) {
+               data = data[:max]
+       }
+       w.sniffed = append(w.sniffed, data...)
+}
+
+func (w *responseWriter) Sniffed() []byte {
+       return w.sniffed
+}
index b41ccd3cddcca658489ed4cf0a8319b99a4619f8..3f83604f4d8f2439462def9152f6369d6bb614ad 100644 (file)
@@ -13,7 +13,7 @@ import org.arvados.client.common.Characters;
 public class FileToken {
 
     private int filePosition;
-    private int fileSize;
+    private long fileSize;
     private String fileName;
     private String path;
 
@@ -29,7 +29,7 @@ public class FileToken {
     private void splitFileTokenInfo(String fileTokenInfo) {
         String[] tokenPieces = fileTokenInfo.split(":");
         this.filePosition = Integer.parseInt(tokenPieces[0]);
-        this.fileSize = Integer.parseInt(tokenPieces[1]);
+        this.fileSize = Long.parseLong(tokenPieces[1]);
         this.fileName = tokenPieces[2].replace(Characters.SPACE, " ");
     }
 
@@ -46,7 +46,7 @@ public class FileToken {
         return this.filePosition;
     }
 
-    public int getFileSize() {
+    public long getFileSize() {
         return this.fileSize;
     }
 
index 1f694f25c2dc2bc7297f570dbfcde7653a9e7353..c1e8849e39f625128133bea1d8376e01e005ca54 100644 (file)
@@ -187,7 +187,7 @@ public class FileDownloader {
         // values for tracking file output streams and matching data chunks with initial files
         int currentDataChunkNumber;
         int bytesDownloadedFromChunk;
-        int bytesToDownload;
+        long bytesToDownload;
         byte[] currentDataChunk;
         boolean remainingDataInChunk;
         final List<KeepLocator> keepLocators;
@@ -199,11 +199,11 @@ public class FileDownloader {
             this.keepLocators = keepLocators;
         }
 
-        private int getBytesToDownload() {
+        private long getBytesToDownload() {
             return bytesToDownload;
         }
 
-        private void setBytesToDownload(int bytesToDownload) {
+        private void setBytesToDownload(long bytesToDownload) {
             this.bytesToDownload = bytesToDownload;
         }
 
@@ -244,7 +244,7 @@ public class FileDownloader {
 
         private void writeDownDataChunkPartially(FileOutputStream fos) throws IOException {
             //write all remaining bytes for this file from current chunk
-            fos.write(currentDataChunk, bytesDownloadedFromChunk, bytesToDownload);
+            fos.write(currentDataChunk, bytesDownloadedFromChunk, (int) bytesToDownload);
             // update number of bytes downloaded from this chunk
             bytesDownloadedFromChunk += bytesToDownload;
             // set remaining data in chunk to true
index 13939852cbde5fa3313bf50053204d566d2201e8..a95ea754e4b89dd30d98e535843e5afdd5e5049e 100644 (file)
@@ -15,7 +15,7 @@ public class FileTokenTest {
 
     public static final String FILE_TOKEN_INFO = "0:1024:test-file1";
     public static final int FILE_POSITION = 0;
-    public static final int FILE_LENGTH = 1024;
+    public static final long FILE_LENGTH = 1024L;
     public static final String FILE_NAME = "test-file1";
     public static final String FILE_PATH = "c" + Characters.SLASH;
 
index 8850d0bfd5a82c10633a3b39d0d5958961ead940..0ba3f0a483fac785a6d080adf5ea494c480d02b3 100755 (executable)
@@ -642,10 +642,11 @@ def create_collection_from(c, src, dst, args):
 #
 def copy_collection(obj_uuid, src, dst, args):
     if arvados.util.keep_locator_pattern.match(obj_uuid):
-        # If the obj_uuid is a portable data hash, it might not be uniquely
-        # identified with a particular collection.  As a result, it is
-        # ambigious as to what name to use for the copy.  Apply some heuristics
-        # to pick which collection to get the name from.
+        # If the obj_uuid is a portable data hash, it might not be
+        # uniquely identified with a particular collection.  As a
+        # result, it is ambiguous as to what name to use for the copy.
+        # Apply some heuristics to pick which collection to get the
+        # name from.
         srccol = src.collections().list(
             filters=[['portable_data_hash', '=', obj_uuid]],
             order="created_at asc"
index 4364229b77284e9dbaab975c626ec9e5e52c3e33..0a03399d1f0607b5412ef6b45a47a4ac230b7e8e 100644 (file)
@@ -17,10 +17,21 @@ class UserSessionsController < ApplicationController
       raise "Local login disabled when LoginCluster is set"
     end
 
-    omniauth = request.env['omniauth.auth']
+    if params[:provider] == 'controller'
+      if request.headers['Authorization'] != 'Bearer ' + Rails.configuration.SystemRootToken
+        return send_error('Invalid authorization header', status: 401)
+      end
+      # arvados-controller verified the user and is passing auth_info
+      # in request params.
+      authinfo = SafeJSON.load(params[:auth_info])
+    else
+      # omniauth middleware verified the user and is passing auth_info
+      # in request.env.
+      authinfo = request.env['omniauth.auth']['info'].with_indifferent_access
+    end
 
     begin
-      user = User.register omniauth['info']
+      user = User.register(authinfo)
     rescue => e
       Rails.logger.warn e
       return redirect_to login_failure_url
@@ -45,8 +56,6 @@ class UserSessionsController < ApplicationController
 
     user.save or raise Exception.new(user.errors.messages)
 
-    omniauth.delete('extra')
-
     # Give the authenticated user a cookie for direct API access
     session[:user_id] = user.id
     session[:api_client_uuid] = nil
index 564274bc99f5e641b3fe1d0df4ba00967d8747af..7a3a854b3a17826117a6ff913ff5f20743f86483 100644 (file)
@@ -399,8 +399,6 @@ class User < ArvadosModel
     #   alternate_emails
     #   identity_url
 
-    info = info.with_indifferent_access
-
     primary_user = nil
 
     # local database
@@ -426,7 +424,7 @@ class User < ArvadosModel
         if !primary_user
           primary_user = user.redirects_to
         elsif primary_user.uuid != user.redirects_to.uuid
-          raise "Ambigious email address, directs to both #{primary_user.uuid} and #{user.redirects_to.uuid}"
+          raise "Ambiguous email address, directs to both #{primary_user.uuid} and #{user.redirects_to.uuid}"
         end
       end
     end
index 5546e8e406de5ec5c3c44e9ab889786391bcd4c1..f82f6e5f371490c070e8b13486208a349b28047a 100644 (file)
@@ -85,6 +85,7 @@ end
 arvcfg = ConfigLoader.new
 arvcfg.declare_config "ClusterID", NonemptyString, :uuid_prefix
 arvcfg.declare_config "ManagementToken", String, :ManagementToken
+arvcfg.declare_config "SystemRootToken", String
 arvcfg.declare_config "Git.Repositories", String, :git_repositories_dir
 arvcfg.declare_config "API.DisabledAPIs", Hash, :disable_api_methods, ->(cfg, k, v) { arrayToHash cfg, "API.DisabledAPIs", v }
 arvcfg.declare_config "API.MaxRequestSize", Integer, :max_request_size
@@ -105,8 +106,8 @@ arvcfg.declare_config "Users.EmailSubjectPrefix", String, :email_subject_prefix
 arvcfg.declare_config "Users.UserNotifierEmailFrom", String, :user_notifier_email_from
 arvcfg.declare_config "Users.NewUserNotificationRecipients", Hash, :new_user_notification_recipients, ->(cfg, k, v) { arrayToHash cfg, "Users.NewUserNotificationRecipients", v }
 arvcfg.declare_config "Users.NewInactiveUserNotificationRecipients", Hash, :new_inactive_user_notification_recipients, method(:arrayToHash)
-arvcfg.declare_config "Login.ProviderAppSecret", NonemptyString, :sso_app_secret
-arvcfg.declare_config "Login.ProviderAppID", NonemptyString, :sso_app_id
+arvcfg.declare_config "Login.ProviderAppSecret", String, :sso_app_secret
+arvcfg.declare_config "Login.ProviderAppID", String, :sso_app_id
 arvcfg.declare_config "Login.LoginCluster", String
 arvcfg.declare_config "Login.RemoteTokenRefresh", ActiveSupport::Duration
 arvcfg.declare_config "TLS.Insecure", Boolean, :sso_insecure
index d96ccb0903fc72026297a412ac474a8ac895af04..fc9475692a5933c2ed01f77e7871f4fd3942d7ec 100644 (file)
@@ -64,4 +64,23 @@ class UserSessionsControllerTest < ActionController::TestCase
     assert_nil assigns(:api_client)
   end
 
+  test "controller cannot create session without SystemRootToken" do
+    get :create, params: {provider: 'controller', auth_info: {email: "foo@bar.com"}, return_to: ',https://app.example'}
+    assert_response 401
+  end
+
+  test "controller cannot create session with wrong SystemRootToken" do
+    @request.headers['Authorization'] = 'Bearer blah'
+    get :create, params: {provider: 'controller', auth_info: {email: "foo@bar.com"}, return_to: ',https://app.example'}
+    assert_response 401
+  end
+
+  test "controller can create session using SystemRootToken" do
+    @request.headers['Authorization'] = 'Bearer '+Rails.configuration.SystemRootToken
+    get :create, params: {provider: 'controller', auth_info: {email: "foo@bar.com"}, return_to: ',https://app.example'}
+    assert_response :redirect
+    api_client_auth = assigns(:api_client_auth)
+    assert_not_nil api_client_auth
+    assert_includes(@response.redirect_url, 'api_token='+api_client_auth.token)
+  end
 end
index 28685267b77f2f1396cb616478d883bdda6811a3..adc37cc5951d231ceef9bedae120f495ad780917 100644 (file)
@@ -843,7 +843,7 @@ class UserTest < ActiveSupport::TestCase
     assert_equal "Baratheon", nbs.last_name
   end
 
-  test "fail when email address is ambigious" do
+  test "fail when email address is ambiguous" do
     User.register({"email" => "active-user@arvados.local"})
     u = User.register({"email" => "never-before-seen-user@arvados.local"})
     u.email = "active-user@arvados.local"
index 1509d7ad8b33fd24f56e80f64ad85d929b940bdb..9ca1134ed8d5375d7c417af0b3ad684f34efa847 100644 (file)
@@ -6,7 +6,6 @@
 Description=Arvados Crunch Dispatcher for SLURM
 Documentation=https://doc.arvados.org/
 After=network.target
-AssertPathExists=/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml
 
 # systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
 StartLimitInterval=0
index 3f529f6313b9ee55483efaa21367572bd54207a4..f6a64a6217f1f9c80c8e90a3756f5238fd796f06 100644 (file)
@@ -15,6 +15,7 @@ import (
        "strings"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
@@ -82,33 +83,49 @@ func (cp *copier) Copy() (string, error) {
                        return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
                }
        }
+       var unflushed int64
+       var lastparentdir string
        for _, f := range cp.files {
-               err = cp.copyFile(fs, f)
+               // If a dir has just had its last file added, do a
+               // full Flush. Otherwise, do a partial Flush (write
+               // full-size blocks, but leave the last short block
+               // open so f's data can be packed with it).
+               dir, _ := filepath.Split(f.dst)
+               if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
+                       if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
+                               return "", fmt.Errorf("error flushing output collection file data: %v", err)
+                       }
+                       unflushed = 0
+               }
+               lastparentdir = dir
+
+               n, err := cp.copyFile(fs, f)
                if err != nil {
                        return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
                }
+               unflushed += n
        }
        return fs.MarshalManifest(".")
 }
 
-func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
+func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
        cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
        dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
        if err != nil {
-               return err
+               return 0, err
        }
        src, err := os.Open(f.src)
        if err != nil {
                dst.Close()
-               return err
+               return 0, err
        }
        defer src.Close()
-       _, err = io.Copy(dst, src)
+       n, err := io.Copy(dst, src)
        if err != nil {
                dst.Close()
-               return err
+               return n, err
        }
-       return dst.Close()
+       return n, dst.Close()
 }
 
 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
index 3261291b53650c516f7b58ca50668bfd27dca964..d1092e98ef50cd221fdbb9c18df0ad6efe8bdd96 100644 (file)
@@ -95,7 +95,7 @@ type ContainerRunner struct {
        Docker ThinDockerClient
 
        // Dispatcher client is initialized with the Dispatcher token.
-       // This is a priviledged token used to manage container status
+       // This is a privileged token used to manage container status
        // and logs.
        //
        // We have both dispatcherClient and DispatcherArvClient
@@ -850,21 +850,42 @@ func (runner *ContainerRunner) LogContainerRecord() error {
        return err
 }
 
-// LogNodeRecord logs arvados#node record corresponding to the current host.
+// LogNodeRecord logs the current host's InstanceType config entry (or
+// the arvados#node record, if running via crunch-dispatch-slurm).
 func (runner *ContainerRunner) LogNodeRecord() error {
-       hostname := os.Getenv("SLURMD_NODENAME")
-       if hostname == "" {
-               hostname, _ = os.Hostname()
-       }
-       _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
-               // The "info" field has admin-only info when obtained
-               // with a privileged token, and should not be logged.
-               node, ok := resp.(map[string]interface{})
-               if ok {
-                       delete(node, "info")
-               }
-       })
-       return err
+       if it := os.Getenv("InstanceType"); it != "" {
+               // Dispatched via arvados-dispatch-cloud. Save
+               // InstanceType config fragment received from
+               // dispatcher on stdin.
+               w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
+               if err != nil {
+                       return err
+               }
+               defer w.Close()
+               _, err = io.WriteString(w, it)
+               if err != nil {
+                       return err
+               }
+               return w.Close()
+       } else {
+               // Dispatched via crunch-dispatch-slurm. Look up
+               // apiserver's node record corresponding to
+               // $SLURMD_NODENAME.
+               hostname := os.Getenv("SLURMD_NODENAME")
+               if hostname == "" {
+                       hostname, _ = os.Hostname()
+               }
+               _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
+                       // The "info" field has admin-only info when
+                       // obtained with a privileged token, and
+                       // should not be logged.
+                       node, ok := resp.(map[string]interface{})
+                       if ok {
+                               delete(node, "info")
+                       }
+               })
+               return err
+       }
 }
 
 func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
index f9e0c1a505376b76e9242ac721a09d7a37fc17ba..3737e6b3befd775565e1d5df1e33ef1e2763d22b 100644 (file)
@@ -6,7 +6,6 @@ package main
 
 import (
        "encoding/json"
-       "fmt"
        "html"
        "html/template"
        "io"
@@ -22,10 +21,11 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       log "github.com/sirupsen/logrus"
+       "github.com/sirupsen/logrus"
        "golang.org/x/net/webdav"
 )
 
@@ -102,6 +102,7 @@ func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
 // are ignored (all response writes return the update error).
 type updateOnSuccess struct {
        httpserver.ResponseWriter
+       logger     logrus.FieldLogger
        update     func() error
        sentHeader bool
        err        error
@@ -126,7 +127,7 @@ func (uos *updateOnSuccess) WriteHeader(code int) {
                                if err, ok := uos.err.(*arvados.TransactionError); ok {
                                        code = err.StatusCode
                                }
-                               log.Printf("update() changes response to HTTP %d: %T %q", code, uos.err, uos.err)
+                               uos.logger.WithError(uos.err).Errorf("update() returned error type %T, changing response to HTTP %d", uos.err, code)
                                http.Error(uos.ResponseWriter, uos.err.Error(), code)
                                return
                        }
@@ -182,9 +183,6 @@ var (
 func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        h.setupOnce.Do(h.setup)
 
-       var statusCode = 0
-       var statusText string
-
        remoteAddr := r.RemoteAddr
        if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
                remoteAddr = xff + "," + remoteAddr
@@ -194,19 +192,6 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
 
        w := httpserver.WrapResponseWriter(wOrig)
-       defer func() {
-               if statusCode == 0 {
-                       statusCode = w.WroteStatus()
-               } else if w.WroteStatus() == 0 {
-                       w.WriteHeader(statusCode)
-               } else if w.WroteStatus() != statusCode {
-                       log.WithField("RequestID", r.Header.Get("X-Request-Id")).Warn(
-                               fmt.Sprintf("Our status changed from %d to %d after we sent headers", w.WroteStatus(), statusCode))
-               }
-               if statusText == "" {
-                       statusText = http.StatusText(statusCode)
-               }
-       }()
 
        if strings.HasPrefix(r.URL.Path, "/_health/") && r.Method == "GET" {
                h.healthHandler.ServeHTTP(w, r)
@@ -215,19 +200,18 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 
        if method := r.Header.Get("Access-Control-Request-Method"); method != "" && r.Method == "OPTIONS" {
                if !browserMethod[method] && !webdavMethod[method] {
-                       statusCode = http.StatusMethodNotAllowed
+                       w.WriteHeader(http.StatusMethodNotAllowed)
                        return
                }
                w.Header().Set("Access-Control-Allow-Headers", corsAllowHeadersHeader)
                w.Header().Set("Access-Control-Allow-Methods", "COPY, DELETE, GET, LOCK, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PROPPATCH, PUT, RMCOL, UNLOCK")
                w.Header().Set("Access-Control-Allow-Origin", "*")
                w.Header().Set("Access-Control-Max-Age", "86400")
-               statusCode = http.StatusOK
                return
        }
 
        if !browserMethod[r.Method] && !webdavMethod[r.Method] {
-               statusCode, statusText = http.StatusMethodNotAllowed, r.Method
+               w.WriteHeader(http.StatusMethodNotAllowed)
                return
        }
 
@@ -293,7 +277,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
 
        if collectionID == "" && !useSiteFS {
-               statusCode = http.StatusNotFound
+               w.WriteHeader(http.StatusNotFound)
                return
        }
 
@@ -368,7 +352,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 
        arv := h.clientPool.Get()
        if arv == nil {
-               statusCode, statusText = http.StatusInternalServerError, "Pool failed: "+h.clientPool.Err().Error()
+               http.Error(w, "client pool error: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
                return
        }
        defer h.clientPool.Put(arv)
@@ -392,7 +376,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        }
                }
                // Something more serious is wrong
-               statusCode, statusText = http.StatusInternalServerError, err.Error()
+               http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
                return
        }
        if collection == nil {
@@ -402,14 +386,14 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        // for additional credentials would just be
                        // confusing), or we don't even accept
                        // credentials at this path.
-                       statusCode = http.StatusNotFound
+                       w.WriteHeader(http.StatusNotFound)
                        return
                }
                for _, t := range reqTokens {
                        if tokenResult[t] == 404 {
                                // The client provided valid token(s), but the
                                // collection was not found.
-                               statusCode = http.StatusNotFound
+                               w.WriteHeader(http.StatusNotFound)
                                return
                        }
                }
@@ -423,13 +407,13 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                // data that has been deleted.  Allow a referrer to
                // provide this context somehow?
                w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
-               statusCode = http.StatusUnauthorized
+               w.WriteHeader(http.StatusUnauthorized)
                return
        }
 
        kc, err := keepclient.MakeKeepClient(arv)
        if err != nil {
-               statusCode, statusText = http.StatusInternalServerError, err.Error()
+               http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
                return
        }
        kc.RequestID = r.Header.Get("X-Request-Id")
@@ -448,14 +432,14 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 
        fs, err := collection.FileSystem(client, kc)
        if err != nil {
-               statusCode, statusText = http.StatusInternalServerError, err.Error()
+               http.Error(w, "error creating collection filesystem: "+err.Error(), http.StatusInternalServerError)
                return
        }
 
        writefs, writeOK := fs.(arvados.CollectionFileSystem)
        targetIsPDH := arvadosclient.PDHMatch(collectionID)
        if (targetIsPDH || !writeOK) && writeMethod[r.Method] {
-               statusCode, statusText = http.StatusMethodNotAllowed, errReadOnly.Error()
+               http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
                return
        }
 
@@ -467,6 +451,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        // collection can't be saved.
                        w = &updateOnSuccess{
                                ResponseWriter: w,
+                               logger:         ctxlog.FromContext(r.Context()),
                                update: func() error {
                                        return h.Config.Cache.Update(client, *collection, writefs)
                                }}
@@ -481,7 +466,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        LockSystem: h.webdavLS,
                        Logger: func(_ *http.Request, err error) {
                                if err != nil {
-                                       log.Printf("error from webdav handler: %q", err)
+                                       ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
                                }
                        },
                }
@@ -492,13 +477,13 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        openPath := "/" + strings.Join(targetPath, "/")
        if f, err := fs.Open(openPath); os.IsNotExist(err) {
                // Requested non-existent path
-               statusCode = http.StatusNotFound
+               w.WriteHeader(http.StatusNotFound)
        } else if err != nil {
                // Some other (unexpected) error
-               statusCode, statusText = http.StatusInternalServerError, err.Error()
+               http.Error(w, "open: "+err.Error(), http.StatusInternalServerError)
        } else if stat, err := f.Stat(); err != nil {
                // Can't get Size/IsDir (shouldn't happen with a collectionFS!)
-               statusCode, statusText = http.StatusInternalServerError, err.Error()
+               http.Error(w, "stat: "+err.Error(), http.StatusInternalServerError)
        } else if stat.IsDir() && !strings.HasSuffix(r.URL.Path, "/") {
                // If client requests ".../dirname", redirect to
                // ".../dirname/". This way, relative links in the
@@ -509,14 +494,14 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                h.serveDirectory(w, r, collection.Name, fs, openPath, true)
        } else {
                http.ServeContent(w, r, basename, stat.ModTime(), f)
-               if r.Header.Get("Range") == "" && int64(w.WroteBodyBytes()) != stat.Size() {
+               if wrote := int64(w.WroteBodyBytes()); wrote != stat.Size() && r.Header.Get("Range") == "" {
                        // If we wrote fewer bytes than expected, it's
                        // too late to change the real response code
                        // or send an error message to the client, but
                        // at least we can try to put some useful
                        // debugging info in the logs.
                        n, err := f.Read(make([]byte, 1024))
-                       statusCode, statusText = http.StatusInternalServerError, fmt.Sprintf("f.Size()==%d but only wrote %d bytes; read(1024) returns %d, %s", stat.Size(), w.WroteBodyBytes(), n, err)
+                       ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %s", stat.Size(), wrote, n, err)
 
                }
        }
@@ -542,7 +527,7 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
 
        kc, err := keepclient.MakeKeepClient(arv)
        if err != nil {
-               http.Error(w, err.Error(), http.StatusInternalServerError)
+               http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
                return
        }
        kc.RequestID = r.Header.Get("X-Request-Id")
@@ -583,7 +568,7 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
                LockSystem: h.webdavLS,
                Logger: func(_ *http.Request, err error) {
                        if err != nil {
-                               log.Printf("error from webdav handler: %q", err)
+                               ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
                        }
                },
        }
@@ -697,7 +682,7 @@ func (h *handler) serveDirectory(w http.ResponseWriter, r *http.Request, collect
                return nil
        }
        if err := walk(""); err != nil {
-               http.Error(w, err.Error(), http.StatusInternalServerError)
+               http.Error(w, "error getting directory listing: "+err.Error(), http.StatusInternalServerError)
                return
        }
 
@@ -708,7 +693,7 @@ func (h *handler) serveDirectory(w http.ResponseWriter, r *http.Request, collect
        }
        tmpl, err := template.New("dir").Funcs(funcs).Parse(dirListingTemplate)
        if err != nil {
-               http.Error(w, err.Error(), http.StatusInternalServerError)
+               http.Error(w, "error parsing template: "+err.Error(), http.StatusInternalServerError)
                return
        }
        sort.Slice(files, func(i, j int) bool {
@@ -748,7 +733,7 @@ func (h *handler) seeOtherWithCookie(w http.ResponseWriter, r *http.Request, loc
                        // into a cookie unless the current vhost
                        // (origin) serves only a single collection or
                        // we are in TrustAllContent mode.
-                       w.WriteHeader(http.StatusBadRequest)
+                       http.Error(w, "cannot serve inline content at this URL (possible configuration error; see https://doc.arvados.org/install/install-keep-web.html#dns)", http.StatusBadRequest)
                        return
                }
 
@@ -780,7 +765,7 @@ func (h *handler) seeOtherWithCookie(w http.ResponseWriter, r *http.Request, loc
        if location != "" {
                newu, err := u.Parse(location)
                if err != nil {
-                       w.WriteHeader(http.StatusInternalServerError)
+                       http.Error(w, "error resolving redirect target: "+err.Error(), http.StatusInternalServerError)
                        return
                }
                u = newu
index 34333d43424863c9ced8662a5f6867de6723e48b..aefd0fd08dd20c24a03182c6e967acfccbdcc6ae 100644 (file)
@@ -349,7 +349,7 @@ func (s *IntegrationSuite) TestVhostRedirectQueryTokenSingleOriginError(c *check
                "",
                "",
                http.StatusBadRequest,
-               "",
+               "cannot serve inline content at this URL (possible configuration error; see https://doc.arvados.org/install/install-keep-web.html#dns)\n",
        )
 }
 
@@ -424,7 +424,7 @@ func (s *IntegrationSuite) TestVhostRedirectQueryTokenAttachmentOnlyHost(c *chec
                "",
                "",
                http.StatusBadRequest,
-               "",
+               "cannot serve inline content at this URL (possible configuration error; see https://doc.arvados.org/install/install-keep-web.html#dns)\n",
        )
 
        resp := s.testVhostRedirectTokenToCookie(c, "GET",
index c589e639f557782ad4a44ffec4a40eaf6b399c94..006d2446396d1734cf5e7d9de735a1d339cb3798 100644 (file)
@@ -190,7 +190,7 @@ func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token str
        // Initialize the trashq and workers
        h.trashq = NewWorkQueue()
        for i := 0; i < 1 || i < h.Cluster.Collections.BlobTrashConcurrency; i++ {
-               go RunTrashWorker(h.volmgr, h.Cluster, h.trashq)
+               go RunTrashWorker(h.volmgr, h.Logger, h.Cluster, h.trashq)
        }
 
        // Set up routes and metrics
index 9d69b9fa47ef1b173b70e1f16617ba6dd9531351..54b4871fab89a59a2b95ba893912080ec13ff070 100644 (file)
@@ -892,10 +892,7 @@ func ExpectStatusCode(
        testname string,
        expectedStatus int,
        response *httptest.ResponseRecorder) {
-       if response.Code != expectedStatus {
-               c.Errorf("%s: expected status %d, got %+v",
-                       testname, expectedStatus, response)
-       }
+       c.Check(response.Code, check.Equals, expectedStatus, check.Commentf("%s", testname))
 }
 
 func ExpectBody(
@@ -1147,12 +1144,7 @@ func (s *HandlerSuite) TestUntrashHandler(c *check.C) {
                "",
                http.StatusOK,
                response)
-       expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
-       if response.Body.String() != expected {
-               c.Errorf(
-                       "Untrash response mismatched: expected %s, got:\n%s",
-                       expected, response.Body.String())
-       }
+       c.Check(response.Body.String(), check.Equals, "Successfully untrashed on: [MockVolume], [MockVolume]\n")
 }
 
 func (s *HandlerSuite) TestUntrashHandlerWithNoWritableVolumes(c *check.C) {
index 86504422d52f24f2659166e7cbfa975cb45772da..0fcc12144136ddb72ecfab6911dc1542ab0dcdf8 100644 (file)
@@ -11,7 +11,6 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "log"
        "net/http"
        "os"
        "regexp"
@@ -282,19 +281,15 @@ func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
 
        for _, v := range vols {
                if err := v.IndexTo(prefix, resp); err != nil {
-                       // We can't send an error message to the
-                       // client because we might have already sent
-                       // headers and index content. All we can do is
-                       // log the error in our own logs, and (in
-                       // cases where headers haven't been sent yet)
-                       // set a 500 status.
+                       // We can't send an error status/message to
+                       // the client because IndexTo() might have
+                       // already written body content. All we can do
+                       // is log the error in our own logs.
                        //
-                       // If headers have already been sent, the
-                       // client must notice the lack of trailing
+                       // The client must notice the lack of trailing
                        // newline as an indication that the response
                        // is incomplete.
-                       log.Printf("index error from volume %s: %s", v, err)
-                       http.Error(resp, "", http.StatusInternalServerError)
+                       ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
                        return
                }
        }
@@ -346,25 +341,25 @@ func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
        }
        var ds debugStats
        runtime.ReadMemStats(&ds.MemStats)
-       err := json.NewEncoder(resp).Encode(&ds)
+       data, err := json.Marshal(&ds)
        if err != nil {
-               http.Error(resp, err.Error(), 500)
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(data)
 }
 
 // StatusHandler addresses /status.json requests.
 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
        stLock.Lock()
        rtr.readNodeStatus(&st)
-       jstat, err := json.Marshal(&st)
+       data, err := json.Marshal(&st)
        stLock.Unlock()
-       if err == nil {
-               resp.Write(jstat)
-       } else {
-               log.Printf("json.Marshal: %s", err)
-               log.Printf("NodeStatus = %v", &st)
-               http.Error(resp, err.Error(), 500)
+       if err != nil {
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(data)
 }
 
 // populate the given NodeStatus struct with current values.
@@ -461,28 +456,19 @@ func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
                        continue
                } else {
                        result.Failed++
-                       log.Println("DeleteHandler:", err)
+                       ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
                }
        }
-
-       var st int
-
        if result.Deleted == 0 && result.Failed == 0 {
-               st = http.StatusNotFound
-       } else {
-               st = http.StatusOK
+               resp.WriteHeader(http.StatusNotFound)
+               return
        }
-
-       resp.WriteHeader(st)
-
-       if st == http.StatusOK {
-               if body, err := json.Marshal(result); err == nil {
-                       resp.Write(body)
-               } else {
-                       log.Printf("json.Marshal: %s (result = %v)", err, result)
-                       http.Error(resp, err.Error(), 500)
-               }
+       body, err := json.Marshal(result)
+       if err != nil {
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(body)
 }
 
 /* PullHandler processes "PUT /pull" requests for the data manager.
@@ -604,6 +590,7 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
+       log := ctxlog.FromContext(req.Context())
        hash := mux.Vars(req)["hash"]
 
        if len(rtr.volmgr.AllWritable()) == 0 {
@@ -619,27 +606,26 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
                if os.IsNotExist(err) {
                        numNotFound++
                } else if err != nil {
-                       log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+                       log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
                        failedOn = append(failedOn, vol.String())
                } else {
-                       log.Printf("Untrashed %v on volume %v", hash, vol.String())
+                       log.Infof("Untrashed %v on volume %v", hash, vol.String())
                        untrashedOn = append(untrashedOn, vol.String())
                }
        }
 
        if numNotFound == len(rtr.volmgr.AllWritable()) {
                http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
-               return
-       }
-
-       if len(failedOn) == len(rtr.volmgr.AllWritable()) {
+       } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
                http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
        } else {
-               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
                if len(failedOn) > 0 {
-                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
+                       http.Error(resp, respBody, http.StatusInternalServerError)
+               } else {
+                       fmt.Fprintln(resp, respBody)
                }
-               resp.Write([]byte(respBody))
        }
 }
 
@@ -663,6 +649,8 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
 // DiskHashError.
 //
 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+       log := ctxlog.FromContext(ctx)
+
        // Attempt to read the requested hash from a keep volume.
        errorToCaller := NotFoundError
 
@@ -680,7 +668,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
                        // volumes. If all volumes report IsNotExist,
                        // we return a NotFoundError.
                        if !os.IsNotExist(err) {
-                               log.Printf("%s: Get(%s): %s", vol, hash, err)
+                               log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
                        }
                        // If some volume returns a transient error, return it to the caller
                        // instead of "Not found" so it can retry.
@@ -690,19 +678,16 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
                        continue
                }
                // Check the file checksum.
-               //
                filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
                if filehash != hash {
                        // TODO: Try harder to tell a sysadmin about
                        // this.
-                       log.Printf("%s: checksum mismatch for request %s (actual %s)",
-                               vol, hash, filehash)
+                       log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
                        errorToCaller = DiskHashError
                        continue
                }
                if errorToCaller == DiskHashError {
-                       log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
-                               vol, hash)
+                       log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
                }
                return size, nil
        }
@@ -737,6 +722,8 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 //          provide as much detail as possible.
 //
 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+       log := ctxlog.FromContext(ctx)
+
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
@@ -756,17 +743,19 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if mnt := volmgr.NextWritable(); mnt != nil {
-               if err := mnt.Put(ctx, hash, block); err == nil {
+               if err := mnt.Put(ctx, hash, block); err != nil {
+                       log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+               } else {
                        return mnt.Replication, nil // success!
                }
-               if ctx.Err() != nil {
-                       return 0, ErrClientDisconnect
-               }
+       }
+       if ctx.Err() != nil {
+               return 0, ErrClientDisconnect
        }
 
        writables := volmgr.AllWritable()
        if len(writables) == 0 {
-               log.Print("No writable volumes.")
+               log.Error("no writable volumes")
                return 0, FullError
        }
 
@@ -776,20 +765,22 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
                if ctx.Err() != nil {
                        return 0, ErrClientDisconnect
                }
-               if err == nil {
+               switch err {
+               case nil:
                        return vol.Replication, nil // success!
-               }
-               if err != FullError {
+               case FullError:
+                       continue
+               default:
                        // The volume is not full but the
                        // write did not succeed.  Report the
                        // error and continue trying.
                        allFull = false
-                       log.Printf("%s: Write(%s): %s", vol, hash, err)
+                       log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
                }
        }
 
        if allFull {
-               log.Print("All volumes are full.")
+               log.Error("all volumes are full")
                return 0, FullError
        }
        // Already logged the non-full errors.
@@ -802,6 +793,7 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+       log := ctxlog.FromContext(ctx)
        var bestErr error = NotFoundError
        for _, mnt := range volmgr.AllWritable() {
                err := mnt.Compare(ctx, hash, buf)
@@ -813,7 +805,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        // to tell which one is wanted if we have
                        // both, so there's no point writing it even
                        // on a different volume.)
-                       log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+                       log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
                        return 0, err
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
@@ -823,11 +815,11 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        // Couldn't open file, data is corrupt on
                        // disk, etc.: log this abnormal condition,
                        // and try the next volume.
-                       log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+                       log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
                        continue
                }
                if err := mnt.Touch(hash); err != nil {
-                       log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
+                       log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
                        bestErr = err
                        continue
                }
@@ -861,18 +853,6 @@ func GetAPIToken(req *http.Request) string {
        return ""
 }
 
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestampHex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestampHex string) bool {
-       ts, err := strconv.ParseInt(timestampHex, 16, 0)
-       if err != nil {
-               log.Printf("IsExpired: %s", err)
-               return true
-       }
-       return time.Unix(ts, 0).Before(time.Now())
-}
-
 // canDelete returns true if the user identified by apiToken is
 // allowed to delete blocks.
 func (rtr *router) canDelete(apiToken string) bool {
index 220377af280f2d64c682624ee69a67cfd6f3b636..08cc591fc52f8a54c34cb426c03a2835f23c129e 100644 (file)
@@ -16,7 +16,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net/http"
        "os"
        "regexp"
@@ -37,11 +36,12 @@ func init() {
 }
 
 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
-       v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
+       v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
        err := json.Unmarshal(volume.DriverParameters, &v)
        if err != nil {
                return nil, err
        }
+       v.logger = logger.WithField("Volume", v.String())
        return v, v.check()
 }
 
@@ -340,7 +340,7 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
 
        rdr, err = v.bucket.GetReader(loc)
        if err != nil {
-               log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
+               v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
                err = v.translateError(err)
        }
        return
@@ -465,7 +465,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        go func() {
                defer func() {
                        if ctx.Err() != nil {
-                               v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+                               v.logger.Debugf("abandoned PutReader goroutine finished with err: %s", err)
                        }
                }()
                defer close(ready)
@@ -477,7 +477,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        }()
        select {
        case <-ctx.Done():
-               v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
+               v.logger.Debugf("taking PutReader's input away: %s", ctx.Err())
                // Our pipe might be stuck in Write(), waiting for
                // PutReader() to read. If so, un-stick it. This means
                // PutReader will get corrupt data, but that's OK: the
@@ -485,7 +485,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                go io.Copy(ioutil.Discard, bufr)
                // CloseWithError() will return once pending I/O is done.
                bufw.CloseWithError(ctx.Err())
-               v.logger.Debugf("%s: abandoning PutReader goroutine", v)
+               v.logger.Debugf("abandoning PutReader goroutine")
                return ctx.Err()
        case <-ready:
                // Unblock pipe in case PutReader did not consume it.
@@ -523,13 +523,13 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
                // The data object X exists, but recent/X is missing.
                err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
                if err != nil {
-                       log.Printf("error: creating %q: %s", "recent/"+loc, err)
+                       v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
                        return zeroTime, v.translateError(err)
                }
-               log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
+               v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
                resp, err = v.bucket.Head("recent/"+loc, nil)
                if err != nil {
-                       log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
+                       v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
                        return zeroTime, v.translateError(err)
                }
        } else if err != nil {
@@ -544,12 +544,14 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
        // Use a merge sort to find matching sets of X and recent/X.
        dataL := s3Lister{
+               Logger:   v.logger,
                Bucket:   v.bucket.Bucket(),
                Prefix:   prefix,
                PageSize: v.IndexPageSize,
                Stats:    &v.bucket.stats,
        }
        recentL := s3Lister{
+               Logger:   v.logger,
                Bucket:   v.bucket.Bucket(),
                Prefix:   "recent/" + prefix,
                PageSize: v.IndexPageSize,
@@ -744,24 +746,24 @@ func (v *S3Volume) fixRace(loc string) bool {
        trash, err := v.bucket.Head("trash/"+loc, nil)
        if err != nil {
                if !os.IsNotExist(v.translateError(err)) {
-                       log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
+                       v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
                }
                return false
        }
        trashTime, err := v.lastModified(trash)
        if err != nil {
-               log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
+               v.logger.WithError(err).Errorf("fixRace: error parsing time %q", trash.Header.Get("Last-Modified"))
                return false
        }
 
        recent, err := v.bucket.Head("recent/"+loc, nil)
        if err != nil {
-               log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
+               v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
                return false
        }
        recentTime, err := v.lastModified(recent)
        if err != nil {
-               log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
+               v.logger.WithError(err).Errorf("fixRace: error parsing time %q", recent.Header.Get("Last-Modified"))
                return false
        }
 
@@ -772,11 +774,11 @@ func (v *S3Volume) fixRace(loc string) bool {
                return false
        }
 
-       log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
-       log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+       v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+       v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
        err = v.safeCopy(loc, "trash/"+loc)
        if err != nil {
-               log.Printf("error: fixRace: %s", err)
+               v.logger.WithError(err).Error("fixRace: copy failed")
                return false
        }
        return true
@@ -819,24 +821,24 @@ func (v *S3Volume) EmptyTrash() {
 
                trashT, err := time.Parse(time.RFC3339, trash.LastModified)
                if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+                       v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
                        return
                }
                recent, err := v.bucket.Head("recent/"+loc, nil)
                if err != nil && os.IsNotExist(v.translateError(err)) {
-                       log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
+                       v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
                        err = v.Untrash(loc)
                        if err != nil {
-                               log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
+                               v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
                        }
                        return
                } else if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+                       v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
                        return
                }
                recentT, err := v.lastModified(recent)
                if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
+                       v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+loc, recent.Header.Get("Last-Modified"))
                        return
                }
                if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
@@ -849,18 +851,18 @@ func (v *S3Volume) EmptyTrash() {
                                // Note this means (TrashSweepInterval
                                // < BlobSigningTTL - raceWindow) is
                                // necessary to avoid starvation.
-                               log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+                               v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
                                v.fixRace(loc)
                                v.Touch(loc)
                                return
                        }
                        _, err := v.bucket.Head(loc, nil)
                        if os.IsNotExist(err) {
-                               log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+                               v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
                                v.fixRace(loc)
                                return
                        } else if err != nil {
-                               log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+                               v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
                                return
                        }
                }
@@ -869,7 +871,7 @@ func (v *S3Volume) EmptyTrash() {
                }
                err = v.bucket.Del(trash.Key)
                if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+                       v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", trash.Key)
                        return
                }
                atomic.AddInt64(&bytesDeleted, trash.Size)
@@ -877,16 +879,16 @@ func (v *S3Volume) EmptyTrash() {
 
                _, err = v.bucket.Head(loc, nil)
                if err == nil {
-                       log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+                       v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
                        return
                }
                if !os.IsNotExist(v.translateError(err)) {
-                       log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+                       v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
                        return
                }
                err = v.bucket.Del("recent/" + loc)
                if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+                       v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
                }
        }
 
@@ -903,6 +905,7 @@ func (v *S3Volume) EmptyTrash() {
        }
 
        trashL := s3Lister{
+               Logger:   v.logger,
                Bucket:   v.bucket.Bucket(),
                Prefix:   "trash/",
                PageSize: v.IndexPageSize,
@@ -915,12 +918,13 @@ func (v *S3Volume) EmptyTrash() {
        wg.Wait()
 
        if err := trashL.Error(); err != nil {
-               log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+               v.logger.WithError(err).Error("EmptyTrash: lister failed")
        }
-       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+       v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
 type s3Lister struct {
+       Logger     logrus.FieldLogger
        Bucket     *s3.Bucket
        Prefix     string
        PageSize   int
@@ -967,7 +971,7 @@ func (lister *s3Lister) getPage() {
        lister.buf = make([]s3.Key, 0, len(resp.Contents))
        for _, key := range resp.Contents {
                if !strings.HasPrefix(key.Key, lister.Prefix) {
-                       log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+                       lister.Logger.Warnf("s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
                        continue
                }
                lister.buf = append(lister.buf, key)
index 49ea24aa03b8cee1903a2de01010b061085b4528..dbd6a45ed9629c14346ecacca7adb4cff001e809 100644 (file)
@@ -11,7 +11,6 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "log"
        "net/http"
        "net/http/httptest"
        "os"
@@ -499,11 +498,11 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
        err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
-               log.Printf("PutRaw: %s: %+v", loc, err)
+               v.logger.Printf("PutRaw: %s: %+v", loc, err)
        }
        err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
-               log.Printf("PutRaw: recent/%s: %+v", loc, err)
+               v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
        }
 }
 
index ba1455ac657bca2f05dd808a23936af83a425e20..3b1bd042305646e766a2d5a128b79e65816e0eab 100644 (file)
@@ -6,10 +6,10 @@ package main
 
 import (
        "errors"
-       "log"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
 )
 
 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
@@ -18,19 +18,19 @@ import (
 //      Delete the block indicated by the trash request Locator
 //             Repeat
 //
-func RunTrashWorker(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashq *WorkQueue) {
+func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) {
        for item := range trashq.NextItem {
                trashRequest := item.(TrashRequest)
-               TrashItem(volmgr, cluster, trashRequest)
+               TrashItem(volmgr, logger, cluster, trashRequest)
                trashq.DoneItem <- struct{}{}
        }
 }
 
 // TrashItem deletes the indicated block from every writable volume.
-func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest TrashRequest) {
+func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) {
        reqMtime := time.Unix(0, trashRequest.BlockMtime)
        if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() {
-               log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
+               logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
                        arvados.Duration(time.Since(reqMtime)),
                        trashRequest.Locator,
                        trashRequest.BlockMtime,
@@ -43,7 +43,7 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
        if uuid := trashRequest.MountUUID; uuid == "" {
                volumes = volmgr.AllWritable()
        } else if mnt := volmgr.Lookup(uuid, true); mnt == nil {
-               log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
+               logger.Warnf("trash request for nonexistent mount: %v", trashRequest)
                return
        } else {
                volumes = []*VolumeMount{mnt}
@@ -52,11 +52,11 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
        for _, volume := range volumes {
                mtime, err := volume.Mtime(trashRequest.Locator)
                if err != nil {
-                       log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+                       logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
                        continue
                }
                if trashRequest.BlockMtime != mtime.UnixNano() {
-                       log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
+                       logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
                        continue
                }
 
@@ -67,9 +67,9 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
                }
 
                if err != nil {
-                       log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+                       logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
                } else {
-                       log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)
+                       logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator)
                }
        }
 }
index bd3743090ab90f1c4dfd6434136cd1776a94ea22..c2052077fedeacf82754ce426297185ae917bd00 100644 (file)
@@ -9,6 +9,7 @@ import (
        "context"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
@@ -291,7 +292,7 @@ func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTe
                        }
                }
        }
-       go RunTrashWorker(s.handler.volmgr, s.cluster, trashq)
+       go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq)
 
        // Install gate so all local operations block until we say go
        gate := make(chan struct{})
index 6504f9c16b166cf7d5222f59988939beff878802..f41bd30d3d10045d715786bcb6c84c68356c3afd 100644 (file)
@@ -11,7 +11,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "os"
        "os/exec"
        "path/filepath"
@@ -38,6 +37,7 @@ func newDirectoryVolume(cluster *arvados.Cluster, volume arvados.Volume, logger
        if err != nil {
                return nil, err
        }
+       v.logger = v.logger.WithField("Volume", v.String())
        return v, v.check()
 }
 
@@ -84,7 +84,7 @@ type UnixVolume struct {
 // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
 func (v *UnixVolume) GetDeviceID() string {
        giveup := func(f string, args ...interface{}) string {
-               log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...)
+               v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...)
                return ""
        }
        buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
@@ -143,7 +143,7 @@ func (v *UnixVolume) GetDeviceID() string {
                link := filepath.Join(udir, uuid)
                fi, err = os.Stat(link)
                if err != nil {
-                       log.Printf("error: stat %q: %s", link, err)
+                       v.logger.WithError(err).Errorf("stat(%q) failed", link)
                        continue
                }
                if fi.Sys().(*syscall.Stat_t).Ino == ino {
@@ -271,15 +271,12 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
        }
        bdir := v.blockDir(loc)
        if err := os.MkdirAll(bdir, 0755); err != nil {
-               log.Printf("%s: could not create directory %s: %s",
-                       loc, bdir, err)
-               return err
+               return fmt.Errorf("error creating directory %s: %s", bdir, err)
        }
 
        tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc)
        if tmperr != nil {
-               log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
-               return tmperr
+               return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, tmperr)
        }
 
        bpath := v.blockPath(loc)
@@ -291,19 +288,20 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
        n, err := io.Copy(tmpfile, rdr)
        v.os.stats.TickOutBytes(uint64(n))
        if err != nil {
-               log.Printf("%s: writing to %s: %s", v, bpath, err)
+               err = fmt.Errorf("error writing %s: %s", bpath, err)
                tmpfile.Close()
                v.os.Remove(tmpfile.Name())
                return err
        }
        if err := tmpfile.Close(); err != nil {
-               log.Printf("closing %s: %s", tmpfile.Name(), err)
+               err = fmt.Errorf("error closing %s: %s", tmpfile.Name(), err)
                v.os.Remove(tmpfile.Name())
                return err
        }
        if err := v.os.Rename(tmpfile.Name(), bpath); err != nil {
-               log.Printf("rename %s %s: %s", tmpfile.Name(), bpath, err)
-               return v.os.Remove(tmpfile.Name())
+               err = fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err)
+               v.os.Remove(tmpfile.Name())
+               return err
        }
        return nil
 }
@@ -314,14 +312,14 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
 func (v *UnixVolume) Status() *VolumeStatus {
        fi, err := v.os.Stat(v.Root)
        if err != nil {
-               log.Printf("%s: os.Stat: %s", v, err)
+               v.logger.WithError(err).Error("stat failed")
                return nil
        }
        devnum := fi.Sys().(*syscall.Stat_t).Dev
 
        var fs syscall.Statfs_t
        if err := syscall.Statfs(v.Root, &fs); err != nil {
-               log.Printf("%s: statfs: %s", v, err)
+               v.logger.WithError(err).Error("statfs failed")
                return nil
        }
        // These calculations match the way df calculates disk usage:
@@ -380,8 +378,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                blockdirpath := filepath.Join(v.Root, names[0])
                blockdir, err := v.os.Open(blockdirpath)
                if err != nil {
-                       log.Print("Error reading ", blockdirpath, ": ", err)
-                       lastErr = err
+                       v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+                       lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
                        continue
                }
                v.os.stats.TickOps("readdir")
@@ -391,8 +389,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        if err == io.EOF {
                                break
                        } else if err != nil {
-                               log.Print("Error reading ", blockdirpath, ": ", err)
-                               lastErr = err
+                               v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+                               lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
                                break
                        }
                        name := fileInfo[0].Name()
@@ -408,9 +406,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                                " ", fileInfo[0].ModTime().UnixNano(),
                                "\n")
                        if err != nil {
-                               log.Print("Error writing : ", err)
-                               lastErr = err
-                               break
+                               blockdir.Close()
+                               return fmt.Errorf("error writing: %s", err)
                        }
                }
                blockdir.Close()
@@ -534,7 +531,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
        if avail, err := v.FreeDiskSpace(); err == nil {
                isFull = avail < MinFreeKilobytes
        } else {
-               log.Printf("%s: FreeDiskSpace: %s", v, err)
+               v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v)
                isFull = false
        }
 
@@ -584,7 +581,7 @@ func (v *UnixVolume) lock(ctx context.Context) error {
        }()
        select {
        case <-ctx.Done():
-               log.Printf("%s: client hung up while waiting for Serialize lock (%s)", v, time.Since(t0))
+               v.logger.Infof("client hung up while waiting for Serialize lock (%s)", time.Since(t0))
                go func() {
                        <-locked
                        v.locker.Unlock()
@@ -653,7 +650,7 @@ func (v *UnixVolume) EmptyTrash() {
                }
                deadline, err := strconv.ParseInt(matches[2], 10, 64)
                if err != nil {
-                       log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
+                       v.logger.WithError(err).Errorf("EmptyTrash: %v: ParseInt(%q) failed", path, matches[2])
                        return
                }
                atomic.AddInt64(&bytesInTrash, info.Size())
@@ -663,7 +660,7 @@ func (v *UnixVolume) EmptyTrash() {
                }
                err = v.os.Remove(path)
                if err != nil {
-                       log.Printf("EmptyTrash: Remove %v: %v", path, err)
+                       v.logger.WithError(err).Errorf("EmptyTrash: Remove(%q) failed", path)
                        return
                }
                atomic.AddInt64(&bytesDeleted, info.Size())
@@ -688,7 +685,7 @@ func (v *UnixVolume) EmptyTrash() {
 
        err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
                if err != nil {
-                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+                       v.logger.WithError(err).Errorf("EmptyTrash: filepath.Walk(%q) failed", path)
                        return nil
                }
                todo <- dirent{path, info}
@@ -698,10 +695,10 @@ func (v *UnixVolume) EmptyTrash() {
        wg.Wait()
 
        if err != nil {
-               log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+               v.logger.WithError(err).Error("EmptyTrash failed")
        }
 
-       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+       v.logger.Infof("EmptyTrash stats: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
 type unixStats struct {
index 1ffc46513cb571b94e4953ba231ba62983c99398..664956f7bcaa91c44f306481f17546ab7caab3e1 100644 (file)
@@ -19,6 +19,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
@@ -90,6 +91,7 @@ func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Clu
                        Root:    d,
                        locker:  locker,
                        cluster: cluster,
+                       logger:  ctxlog.TestLogger(c),
                        volume:  volume,
                        metrics: metrics,
                },
index 9e02d41ca8bb6212684a3fd33b228b7fbffe390f..4d7874d42c26ac8ef03eb07ecc3df5063f195b05 100644 (file)
@@ -6,7 +6,6 @@
 Description=Arvados websocket server
 Documentation=https://doc.arvados.org/
 After=network.target
-AssertPathExists=/etc/arvados/ws/ws.yml
 
 # systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
 StartLimitInterval=0
index 2081c2ae1509b86305380b1e97803b1104c65630..8fd088a13741b190b828ed32ceb0c9dab00ecf50 100755 (executable)
@@ -34,6 +34,11 @@ if ! test -s /var/lib/arvados/management_token ; then
 fi
 management_token=$(cat /var/lib/arvados/management_token)
 
+if ! test -s /var/lib/arvados/system_root_token ; then
+    ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/system_root_token
+fi
+system_root_token=$(cat /var/lib/arvados/system_root_token)
+
 if ! test -s /var/lib/arvados/sso_app_secret ; then
     ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/sso_app_secret
 fi
@@ -68,6 +73,7 @@ fi
 cat >/var/lib/arvados/cluster_config.yml <<EOF
 Clusters:
   ${uuid_prefix}:
+    SystemRootToken: $system_root_token
     ManagementToken: $management_token
     Services:
       RailsAPI:
index 9d29eb9f143e8be2d9b88857e093524c73ee3c99..66f249510d07600094b5991043d27661d084648f 100644 (file)
@@ -11,11 +11,12 @@ export npm_config_cache_min=Infinity
 export R_LIBS=/var/lib/Rlibs
 export HOME=$(getent passwd arvbox | cut -d: -f6)
 
+defaultdev=$(/sbin/ip route|awk '/default/ { print $5 }')
+containerip=$(ip addr show $defaultdev | grep 'inet ' | sed 's/ *inet \(.*\)\/.*/\1/')
 if test -s /var/run/localip_override ; then
     localip=$(cat /var/run/localip_override)
 else
-    defaultdev=$(/sbin/ip route|awk '/default/ { print $5 }')
-    localip=$(ip addr show $defaultdev | grep 'inet ' | sed 's/ *inet \(.*\)\/.*/\1/')
+    localip=$containerip
 fi
 
 root_cert=/var/lib/arvados/root-cert.pem
index 1b28a8d014a9fbefc3291440b781d96986fd4caa..5f1e6204fa57a0fff08a3626cdb3e772a777ffe0 100755 (executable)
@@ -8,6 +8,12 @@ set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
 
+if [[ $containerip != $localip ]] ; then
+    if ! grep -q $localip /etc/hosts ; then
+       echo $containerip $localip >> /etc/hosts
+    fi
+fi
+
 openssl verify -CAfile $root_cert $server_cert
 
 cat <<EOF >/var/lib/arvados/nginx.conf
@@ -31,7 +37,7 @@ http {
      geo \$external_client {
           default     1;
           127.0.0.0/8 0;
-          $localip/32 0;
+          $containerip/32 0;
      }
 
      server {
index c146a000325cb25b27cbe30e5e37c80b9144b0bf..b449e2f12bc8394e0c7e92d00c84100cf8c18ec5 100644 (file)
@@ -1,6 +1,6 @@
 {
        "comment": "",
-       "ignore": "test",
+       "ignore": "test appengine",
        "package": [
                {
                        "checksumSHA1": "jfYWZyRWLMfG0J5K7G2K8a9AKfs=",
                        "revision": "4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9",
                        "revisionTime": "2016-08-04T10:47:26Z"
                },
+               {
+                       "checksumSHA1": "bNT5FFLDUXSamYK3jGHSwsTJqqo=",
+                       "path": "github.com/coreos/go-oidc",
+                       "revision": "2be1c5b8a260760503f66dc0996e102b683b3ac3",
+                       "revisionTime": "2019-08-15T17:57:29Z"
+               },
                {
                        "checksumSHA1": "+Zz+leZHHC9C0rx8DoRuffSRPso=",
                        "path": "github.com/coreos/go-systemd/daemon",
                        "revision": "e881fd58d78e04cf6d0de1217f8707c8cc2249bc",
                        "revisionTime": "2017-12-16T07:03:16Z"
                },
+               {
+                       "checksumSHA1": "KxkAlLxQkuSGHH46Dxu6wpAybO4=",
+                       "path": "github.com/pquerna/cachecontrol",
+                       "revision": "1555304b9b35fdd2b425bccf1a5613677705e7d0",
+                       "revisionTime": "2018-05-17T16:36:45Z"
+               },
+               {
+                       "checksumSHA1": "wwaht1P9i8vQu6DqNvMEy24IMgY=",
+                       "path": "github.com/pquerna/cachecontrol/cacheobject",
+                       "revision": "1555304b9b35fdd2b425bccf1a5613677705e7d0",
+                       "revisionTime": "2018-05-17T16:36:45Z"
+               },
                {
                        "checksumSHA1": "Ajt29IHVbX99PUvzn8Gc/lMCXBY=",
                        "path": "github.com/prometheus/client_golang/prometheus",
                        "revisionTime": "2017-11-10T11:01:46Z"
                },
                {
-                       "checksumSHA1": "ySaT8G3I3y4MmnoXOYAAX0rC+p8=",
+                       "checksumSHA1": "umeXHK5iK/3th4PtrTkZllezgWo=",
                        "path": "github.com/sirupsen/logrus",
                        "revision": "d682213848ed68c0a260ca37d6dd5ace8423f5ba",
                        "revisionTime": "2017-12-05T20:32:29Z"
                        "revision": "0fcca4842a8d74bfddc2c96a073bd2a4d2a7a2e8",
                        "revisionTime": "2017-11-25T19:00:56Z"
                },
+               {
+                       "checksumSHA1": "1MGpGDQqnUoRpv7VEcQrXOBydXE=",
+                       "path": "golang.org/x/crypto/pbkdf2",
+                       "revision": "ae8bce0030810cf999bb2b9868ae5c7c58e6343b",
+                       "revisionTime": "2018-04-30T17:54:52Z"
+               },
                {
                        "checksumSHA1": "PJY7uCr3UnX4/Mf/RoWnbieSZ8o=",
                        "path": "golang.org/x/crypto/pkcs12",
                        "revision": "434ec0c7fe3742c984919a691b2018a6e9694425",
                        "revisionTime": "2017-09-25T09:26:47Z"
                },
+               {
+                       "checksumSHA1": "+33kONpAOtjMyyw0uD4AygLvIXg=",
+                       "path": "golang.org/x/oauth2",
+                       "revision": "ec22f46f877b4505e0117eeaab541714644fdd28",
+                       "revisionTime": "2018-05-28T20:23:04Z"
+               },
+               {
+                       "checksumSHA1": "fddd1btmbXxnlMKHUZewlVlSaEQ=",
+                       "path": "golang.org/x/oauth2/internal",
+                       "revision": "ec22f46f877b4505e0117eeaab541714644fdd28",
+                       "revisionTime": "2018-05-28T20:23:04Z"
+               },
                {
                        "checksumSHA1": "znPq37/LZ4pJh7B4Lbu0ZuoMhNk=",
                        "origin": "github.com/docker/docker/vendor/golang.org/x/sys/unix",
                        "revision": "20d25e2804050c1cd24a7eea1e7a6447dd0e74ec",
                        "revisionTime": "2016-12-08T18:13:25Z"
                },
+               {
+                       "checksumSHA1": "oRfTuL23MIBG2nCwjweTJz4Eiqg=",
+                       "path": "gopkg.in/square/go-jose.v2",
+                       "revision": "730df5f748271903322feb182be83b43ebbbe27d",
+                       "revisionTime": "2019-04-10T21:58:30Z"
+               },
+               {
+                       "checksumSHA1": "Ho5sr2GbiR8S35IRni7vC54d5Js=",
+                       "path": "gopkg.in/square/go-jose.v2/cipher",
+                       "revision": "730df5f748271903322feb182be83b43ebbbe27d",
+                       "revisionTime": "2019-04-10T21:58:30Z"
+               },
+               {
+                       "checksumSHA1": "JFun0lWY9eqd80Js2iWsehu1gc4=",
+                       "path": "gopkg.in/square/go-jose.v2/json",
+                       "revision": "730df5f748271903322feb182be83b43ebbbe27d",
+                       "revisionTime": "2019-04-10T21:58:30Z"
+               },
                {
                        "checksumSHA1": "GdsHg+yOsZtdMvD9HJFovPsqKec=",
                        "path": "gopkg.in/src-d/go-billy.v4",