Merge branch '19269-all-users-writable'
authorTom Clegg <tom@curii.com>
Thu, 25 Aug 2022 19:50:14 +0000 (15:50 -0400)
committerTom Clegg <tom@curii.com>
Thu, 25 Aug 2022 19:50:14 +0000 (15:50 -0400)
fixes #19269

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

68 files changed:
.licenseignore
SECURITY.md [new file with mode: 0644]
apps/workbench/app/controllers/actions_controller.rb
apps/workbench/app/controllers/application_controller.rb
apps/workbench/app/helpers/application_helper.rb
apps/workbench/app/models/arvados_api_client.rb
apps/workbench/test/test_helper.rb
cmd/arvados-client/container_gateway.go
cmd/arvados-client/container_gateway_test.go
cmd/arvados-server/cmd.go
doc/_config.yml
doc/admin/config-urls.html.textile.liquid
doc/admin/upgrading.html.textile.liquid
doc/api/methods/container_requests.html.textile.liquid
doc/api/methods/containers.html.textile.liquid
doc/architecture/hpc.html.textile.liquid [new file with mode: 0644]
doc/install/arvbox.html.textile.liquid
go.mod
go.sum
lib/config/load.go
lib/controller/federation/conn.go
lib/controller/integration_test.go
lib/controller/localdb/conn.go
lib/controller/localdb/container_gateway.go
lib/controller/localdb/container_gateway_test.go
lib/controller/proxy.go
lib/controller/router/request.go
lib/controller/router/router.go
lib/controller/rpc/conn.go
lib/crunchrun/container_gateway.go
lib/crunchrun/crunchrun.go
lib/crunchrun/executor_test.go
lib/crunchrun/integration_test.go
lib/diagnostics/cmd.go
lib/diagnostics/hello-world.tar [new file with mode: 0644]
lib/install/deps.go
lib/install/init.go
lib/lsf/dispatch.go
sdk/cli/bin/arv
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/go/arvados/api.go
sdk/go/arvados/container.go
sdk/go/arvados/container_gateway.go
sdk/go/arvadostest/api.go
sdk/python/arvados/collection.py
sdk/python/tests/run_test_server.py
sdk/python/tests/test_collections.py
services/api/app/controllers/arvados/v1/groups_controller.rb
services/api/app/models/container.rb
services/api/app/models/container_request.rb
services/api/db/migrate/20220804133317_add_cost_to_containers.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/lib/record_filters.rb
services/api/test/functional/arvados/v1/filters_test.rb
services/api/test/unit/container_request_test.rb
services/api/test/unit/container_test.rb
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/script.go
services/crunch-dispatch-slurm/script_test.go
services/keep-balance/block_state.go
services/keep-balance/block_state_test.go
services/keep-balance/collection.go
services/keep-web/handler.go
tools/salt-install/local.params.example.single_host_multiple_hostnames
tools/salt-install/local.params.example.single_host_single_hostname
tools/salt-install/provision.sh

index 203c378bdcfa1dadc483234dcea453c91462066f..6ddb5c009c16eec67db8ea629afc2645064b2fbe 100644 (file)
@@ -92,3 +92,4 @@ sdk/cwl/tests/wf/hello.txt
 sdk/cwl/tests/wf/indir1/hello2.txt
 sdk/cwl/tests/chipseq/data/Genomes/*
 CITATION.cff
+SECURITY.md
\ No newline at end of file
diff --git a/SECURITY.md b/SECURITY.md
new file mode 100644 (file)
index 0000000..4e16ed5
--- /dev/null
@@ -0,0 +1,42 @@
+# Arvados Project Security Policy
+
+## Supported Versions
+
+The Arvados project will issue security fixes by making point releases
+on the current stable release series (X.Y.0, X.Y.1, X.Y.2, etc).
+
+The most recent stable release version, along with release notes and
+upgrade notes documenting security fixes, can be found at these
+locations:
+
+https://arvados.org/releases/
+
+https://doc.arvados.org/admin/upgrading.html
+
+The Arvados project does not support versions older than the current
+stable release except by special arrangement (contact info@curii.com).
+
+Release announcements, including notification of security fixes, are
+sent to the Arvados announcement list:
+
+https://lists.arvados.org//mailman/listinfo/arvados
+
+## Reporting Security Issues
+
+If you believe you have found a security vulnerability in any Arvados-owned repository, please report it to us through coordinated disclosure.
+
+**Please do not report security vulnerabilities through public GitHub issues, discussions, or pull requests.**
+
+Instead, please send an email to dev@curii.com.
+
+Please include as much of the information listed below as you can to help us better understand and resolve the issue:
+
+  * The type of issue (e.g., remote code execution, SQL injection, or cross-site scripting)
+  * Full paths of source file(s) related to the manifestation of the issue
+  * The location of the affected source code (tag/branch/commit or direct URL)
+  * Any special configuration required to reproduce the issue
+  * Step-by-step instructions to reproduce the issue
+  * Proof-of-concept or exploit code (if possible)
+  * Impact of the issue, including how an attacker might exploit the issue
+
+This information will help us triage your report more quickly.
index df489d2eebc997c9efc8f3c55236f46021657fd4..7b8c8eafc81d31cd566c6cef81e050b5ca521c9a 100644 (file)
@@ -167,7 +167,7 @@ class ActionsController < ApplicationController
     flash = {}
 
     # set owner_uuid to current project, provided it is writable
-    action_data = Oj.load(params['action_data'] || "{}")
+    action_data = Oj.safe_load(params['action_data'] || "{}")
     if action_data['current_project_uuid'] and
         current_project = Group.find?(action_data['current_project_uuid']) and
         current_project.writable_by.andand.include?(current_user.uuid)
index 7481575a6111c437761af84afa802018bfba9d63..c2636bf5d74868464da789bdb8a35551869ebc82 100644 (file)
@@ -152,12 +152,12 @@ class ApplicationController < ActionController::Base
     if params[:filters]
       filters = params[:filters]
       if filters.is_a? String
-        filters = Oj.load filters
+        filters = Oj.safe_load filters
       elsif filters.is_a? Array
         filters = filters.collect do |filter|
           if filter.is_a? String
             # Accept filters[]=["foo","=","bar"]
-            Oj.load filter
+            Oj.safe_load filter
           else
             # Accept filters=[["foo","=","bar"]]
             filter
@@ -361,7 +361,7 @@ class ApplicationController < ActionController::Base
     @updates.keys.each do |attr|
       if @object.send(attr).is_a? Hash
         if @updates[attr].is_a? String
-          @updates[attr] = Oj.load @updates[attr]
+          @updates[attr] = Oj.safe_load @updates[attr]
         end
         if params[:merge] || params["merge_#{attr}".to_sym]
           # Merge provided Hash with current Hash, instead of
index f22ab50166591cb0875f32bfdba368af42e91fc3..697c469b563f3553f83c15a1089c0f96745d6b48 100644 (file)
@@ -564,7 +564,7 @@ module ApplicationHelper
                      "data-emptytext" => "none",
                      "data-placement" => "bottom",
                      "data-type" => "select",
-                     "data-source" => (opt_empty_selection + primary_type[:symbols].map {|i| {:value => i, :text => i} }).to_json,
+                     "data-source" => (opt_empty_selection + primary_type[:symbols].map {|i| {:value => cwl_shortname(i), :text => cwl_shortname(i)} }).to_json,
                      "data-url" => url_for(action: "update", id: object.uuid, controller: object.class.to_s.pluralize.underscore, merge: true),
                      "data-title" => "Set value for #{cwl_shortname(input_schema[:id])}",
                      "data-name" => dn,
index 3c9bfa793daa5a8ba143cbbf8fc40122654b6fff..47fcc4ce51ffe1aafb288453b9d9c6a0b777bd0d 100644 (file)
@@ -40,7 +40,7 @@ class ArvadosApiClient
     def initialize(request_url, api_response)
       @api_status = api_response.status_code
       @api_response_s = api_response.content
-      @api_response = Oj.load(@api_response_s, :symbol_keys => true)
+      @api_response = Oj.strict_load(@api_response_s, :symbol_keys => true)
       errors = @api_response[:errors]
       if errors.respond_to?(:join)
         errors = errors.join("\n\n")
@@ -167,7 +167,7 @@ class ArvadosApiClient
     end
 
     begin
-      resp = Oj.load(msg.content, :symbol_keys => true)
+      resp = Oj.strict_load(msg.content, :symbol_keys => true)
     rescue Oj::ParseError
       resp = nil
     end
index 84728b8c6882082bbaf015edf2dcb2450674d61f..2e8ead94cdb5e46f905f8872ec1d7b84ddec1f87 100644 (file)
@@ -158,7 +158,7 @@ class ActiveSupport::TestCase
     }
   end
   def json_response
-    Oj.load(@response.body)
+    Oj.safe_load(@response.body)
   end
 end
 
index aca6c5b797fa4ec3b036ee8300ae3f4fcbe5e885..55f8c33bc70c77d31f13f16bb924ee4c2a6a1613 100644 (file)
@@ -160,7 +160,9 @@ Options:
                fmt.Fprintf(stderr, "target UUID is not a container or container request UUID: %s\n", targetUUID)
                return 1
        }
-       sshconn, err := rpcconn.ContainerSSH(context.TODO(), arvados.ContainerSSHOptions{
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{
                UUID:          targetUUID,
                DetachKeys:    *detachKeys,
                LoginUsername: loginUsername,
@@ -176,7 +178,6 @@ Options:
                return 0
        }
 
-       ctx, cancel := context.WithCancel(context.Background())
        go func() {
                defer cancel()
                _, err := io.Copy(stdout, sshconn.Conn)
index f4a140c4069a9f0daa01d2263acb350ff604854c..743b91d69bde058e78183faf50a0965b56d4f7b6 100644 (file)
@@ -25,6 +25,7 @@ import (
        "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        check "gopkg.in/check.v1"
 )
@@ -53,6 +54,7 @@ func (s *ClientSuite) TestShellGateway(c *check.C) {
                ContainerUUID: uuid,
                Address:       "0.0.0.0:0",
                AuthSecret:    authSecret,
+               Log:           ctxlog.TestLogger(c),
                // Just forward connections to localhost instead of a
                // container, so we can test without running a
                // container.
@@ -86,6 +88,13 @@ func (s *ClientSuite) TestShellGateway(c *check.C) {
        cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
        cmd.Stdout = &stdout
        cmd.Stderr = &stderr
+       stdin, err := cmd.StdinPipe()
+       c.Assert(err, check.IsNil)
+       go fmt.Fprintln(stdin, "data appears on stdin, but stdin does not close; cmd should exit anyway, not hang")
+       time.AfterFunc(5*time.Second, func() {
+               c.Errorf("timed out -- remote end is probably hung waiting for us to close stdin")
+               stdin.Close()
+       })
        c.Check(cmd.Run(), check.IsNil)
        c.Check(stdout.String(), check.Equals, "ok\n")
 
index d9c41ca587b1194415e44377267d565c4ce4eeb5..438ca206daa06dbb9f5211fce8bf57ed788cf6a0 100644 (file)
@@ -28,6 +28,7 @@ import (
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/health"
+       dispatchslurm "git.arvados.org/arvados.git/services/crunch-dispatch-slurm"
        "git.arvados.org/arvados.git/services/githttpd"
        keepbalance "git.arvados.org/arvados.git/services/keep-balance"
        keepweb "git.arvados.org/arvados.git/services/keep-web"
@@ -53,6 +54,7 @@ var (
                "crunch-run":         crunchrun.Command,
                "dispatch-cloud":     dispatchcloud.Command,
                "dispatch-lsf":       lsf.DispatchCommand,
+               "dispatch-slurm":     dispatchslurm.Command,
                "git-httpd":          githttpd.Command,
                "health":             healthCommand,
                "install":            install.Command,
index d2bb7e797582a8c2a98c850face5442b9e07bfdb..148e1a166e0ac6d96499969e54cb7e5c0c1a3c1d 100644 (file)
@@ -161,6 +161,7 @@ navbar:
     - Computation with Crunch:
       - api/execution.html.textile.liquid
       - architecture/dispatchcloud.html.textile.liquid
+      - architecture/hpc.html.textile.liquid
       - architecture/singularity.html.textile.liquid
     - Other:
       - api/permission-model.html.textile.liquid
index 01c30f0e0eb88eecabc0269cabd40c3aabb07892..500e0d8c8c13aea9a73a9c2a00e9a7e87b381002 100644 (file)
@@ -28,7 +28,7 @@ h2. Overview
 table(table table-bordered table-condensed).
 |_.Service     |_.ExternalURL required? |_.InternalURLs required?|_.InternalURLs must be reachable from other cluster nodes?|_.Note|
 |railsapi       |no                     |yes|no ^1^|InternalURLs only used by Controller|
-|controller     |yes                    |yes|no ^2^|InternalURLs only used by reverse proxy (e.g. Nginx)|
+|controller     |yes                    |yes|yes ^2,4^|InternalURLs used by reverse proxy and container shell connections|
 |arvados-dispatch-cloud|no              |yes|no ^3^|InternalURLs only used to expose Prometheus metrics|
 |arvados-dispatch-lsf|no                |yes|no ^3^|InternalURLs only used to expose Prometheus metrics|
 |git-http       |yes                    |yes|no ^2^|InternalURLs only used by reverse proxy (e.g. Nginx)|
@@ -45,6 +45,7 @@ table(table table-bordered table-condensed).
 ^1^ If @Controller@ runs on a different host than @RailsAPI@, the @InternalURLs@ will need to be reachable from the host that runs @Controller@.
 ^2^ If the reverse proxy (e.g. Nginx) does not run on the same host as the Arvados service it fronts, the @InternalURLs@ will need to be reachable from the host that runs the reverse proxy.
 ^3^ If the Prometheus metrics are not collected from the same machine that runs the service, the @InternalURLs@ will need to be reachable from the host that collects the metrics.
+^4^ If dispatching containers to HPC (Slurm/LSF) and there are multiple @Controller@ services, they must be able to connect to one another using their InternalURLs, otherwise the "tunnel connections":{{site.baseurl}}/architecture/hpc.html enabling "container shell access":{{site.baseurl}}/install/container-shell-access.html will not work.
 
 When @InternalURLs@ do not need to be reachable from other nodes, it is most secure to use loopback addresses as @InternalURLs@, e.g. @http://127.0.0.1:9005@.
 
index 96e68239b64620d37ca0b76b11c882946e3232ff..d0dc7cbd87a51d1db2798f2d2b89edd5084c6b59 100644 (file)
@@ -28,10 +28,32 @@ TODO: extract this information based on git commit messages and generate changel
 <div class="releasenotes">
 </notextile>
 
-h2(#main). development main (as of 2022-06-02)
+h2(#main). development main (as of 2022-08-09)
+
+"previous: Upgrading to 2.4.2":#v2_4_2
+
+h2(#v2_4_2). v2.4.2 (2022-08-09)
 
 "previous: Upgrading to 2.4.1":#v2_4_1
 
+h3. GHSL-2022-063
+
+GitHub Security Lab (GHSL) reported a remote code execution (RCE) vulnerability in the Arvados Workbench that allows authenticated attackers to execute arbitrary code via specially crafted JSON payloads.
+
+This vulnerability is fixed in 2.4.2 ("#19316":https://dev.arvados.org/issues/19316).
+
+It is likely that this vulnerability exists in all versions of Arvados up to 2.4.1.
+
+This vulnerability is specific to the Ruby on Rails Workbench application ("Workbench 1").  We do not believe any other Arvados components, including the TypesScript browser-based Workbench application ("Workbench 2") or API Server, are vulnerable to this attack.
+
+h3. CVE-2022-31163 and CVE-2022-32224
+
+As a precaution, Arvados 2.4.2 has includes security updates for Ruby on Rails and the TZInfo Ruby gem.  However, there are no known exploits in Arvados based on these CVEs.
+
+h3. Disable Sharing URLs UI
+
+There is now a configuration option @Workbench.DisableSharingURLsUI@ for admins to disable the user interface for "sharing link" feature (URLs which can be sent to users to access the data in a specific collection in Arvados without an Arvados account), for organizations where sharing links violate their data sharing policy.
+
 h2(#v2_4_1). v2.4.1 (2022-06-02)
 
 "previous: Upgrading to 2.4.0":#v2_4_0
index 15fa207b1c16498a90940dea80c8c9ac99c57446..11f4f34fc83648347d7522c69c67337bec410ed9 100644 (file)
@@ -62,6 +62,7 @@ table(table table-bordered table-condensed).
 |runtime_auth_scopes|array of string|The scopes associated with the auth token used to run this container.||
 |output_storage_classes|array of strings|The storage classes that will be used for the log and output collections of this container request|default is ["default"]|
 |output_properties|hash|User metadata properties to set on the output collection.  The output collection will also have default properties "type" ("intermediate" or "output") and "container_request" (the uuid of container request that produced the collection).|
+|cumulative_cost|number|Estimated cost of the cloud VMs used to satisfy the request, including retried attempts and completed subrequests, but not including reused containers.|0 if container was reused or VM price information was not available.|
 
 h2(#priority). Priority
 
index 43163c555053f1cf749c749fa073e306ceaccd12..e6891621cd68c77b47daf181da9795fa0c8a0b4e 100644 (file)
@@ -61,6 +61,8 @@ Generally this will contain additional keys that are not present in any correspo
 |interactive_session_started|boolean|Indicates whether @arvados-client shell@ has been used to run commands in the container, which may have altered the container's behavior and output.||
 |output_storage_classes|array of strings|The storage classes that will be used for the log and output collections of this container||
 |output_properties|hash|User metadata properties to set on the output collection.|
+|cost|number|Estimated cost of the cloud VM used to run the container.|0 if not available.|
+|subrequests_cost|number|Total estimated cumulative cost of container requests submitted by this container.|0 if not available.|
 
 h2(#container_states). Container states
 
diff --git a/doc/architecture/hpc.html.textile.liquid b/doc/architecture/hpc.html.textile.liquid
new file mode 100644 (file)
index 0000000..03a4649
--- /dev/null
@@ -0,0 +1,29 @@
+---
+layout: default
+navsection: architecture
+title: Dispatching containers to HPC
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Arvados can be configured to run containers on an HPC cluster using Slurm or LSF, as an alternative to "dispatching to cloud VMs":dispatchcloud.html.
+
+In this configuration, the appropriate Arvados dispatcher service -- @crunch-dispatch-slurm@ or @arvados-dispatch-lsf@ -- picks up each container as it appears in the Arvados queue and submits a short shell script as a batch job to the HPC job queue. The shell script executes the @crunch-run@ container supervisor which retrieves the container specification from the Arvados controller, starts an arv-mount process, runs the container using @docker exec@ or @singularity exec@, and sends updates (logs, outputs, exit code, etc.) back to the Arvados controller.
+
+h2. Container communication channel (reverse https tunnel)
+
+The crunch-run program runs a gateway server to facilitate the “container shell” feature. However, depending on the site's network topology, the Arvados controller may not be able to connect directly to the compute node where a given crunch-run process is running.
+
+Instead, in the HPC configuration, crunch-run connects to the Arvados controller at startup and sets up a multiplexed tunnel, allowing the controller process to connect to crunch-run's gateway server without initiating a connection to the compute node, or even knowing the compute node's IP address.
+
+This means that when a client requests a container shell connection, the traffic goes through two or three servers:
+# The client connects to a controller host C1.
+# If the multiplexed tunnel is connected to a different controller host C2, then C1 proxies the incoming request to C2, using C2's InternalURL.
+# The controller host (C1 or C2) uses the multiplexed tunnel to connect to crunch-run's container gateway.
+
+h2. Scaling
+
+The @API.MaxConcurrentRequests@ configuration should not be set too low, or the long-lived tunnel connections can starve other clients.
index a8235ee70e505e777c123370c16dbf952c933141..3c86721c5bffd5c9081afe3c9010c3764a9afcdb 100644 (file)
@@ -16,15 +16,16 @@ h2. Quick start
 <pre>
 $ curl -O https://git.arvados.org/arvados.git/blob_plain/refs/heads/main:/tools/arvbox/bin/arvbox
 $ chmod +x arvbox
-$ ./arvbox start localdemo latest
+$ ./arvbox start localdemo
+$ ./arvbox root-cert
 $ ./arvbox adduser demouser demo@example.com
 </pre>
 
-You can now log in as @demouser@ using the password you selected.
+You will then need to "install the arvbox root certificate":#root-cert .  After that, you can now log in to Workbench as @demouser@ with the password you selected.
 
 h2. Requirements
 
-* Linux 3.x+ and Docker 1.9+
+* Linux 3.x+ and Docker 1.10+
 * Minimum of 3 GiB of RAM  + additional memory to run jobs
 * Minimum of 3 GiB of disk + storage for actual data
 
@@ -68,7 +69,7 @@ removeuser <username>
 listusers          list user logins
 </pre>
 
-h2. Install root certificate
+h2(#root-cert). Install root certificate
 
 Arvbox creates root certificate to authorize Arvbox services.  Installing the root certificate into your web browser will prevent security errors when accessing Arvbox services with your web browser.  Every  Arvbox instance generates a new root signing key.
 
diff --git a/go.mod b/go.mod
index 525bae11ee49be2fd83fe7d717384eaf580550dd..aced60dbc4e0ee83281ba8e03a8e0511a02ab220 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -72,6 +72,7 @@ require (
        github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
        github.com/golang/protobuf v1.5.0 // indirect
        github.com/googleapis/gax-go/v2 v2.0.5 // indirect
+       github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
        github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
        github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
        github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect
diff --git a/go.sum b/go.sum
index 82a8d83d7e2be701f3ab3f9b41312883cb62f3ad..422a891e00ef26add10565538607630d2770863d 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -433,6 +433,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
 github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
 github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
 github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 h1:xixZ2bWeofWV68J+x6AzmKuVM/JWCQwkWm6GW/MUR6I=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
index fbd01488a0be51c430c0c6efc9ef7862ebb88fe5..9269ddf27f59011b5dad2855edde8fb9b676ed41 100644 (file)
@@ -448,6 +448,7 @@ func (ldr *Loader) setLoopbackInstanceType(cfg *arvados.Config) error {
                        RAM:             hostram,
                        Scratch:         scratch,
                        IncludedScratch: scratch,
+                       Price:           1.0,
                }}
                cfg.Clusters[id] = cc
        }
index d9f587852d149da46cd49ddbfc9dd46095fe180e..ffb150bf26aa148b511f4bbde98305469ffef5df 100644 (file)
@@ -375,10 +375,14 @@ func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOption
        return conn.chooseBackend(options.UUID).ContainerUnlock(ctx, options)
 }
 
-func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ContainerSSHConnection, error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ConnectionResponse, error) {
        return conn.chooseBackend(options.UUID).ContainerSSH(ctx, options)
 }
 
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) {
+       return conn.chooseBackend(options.UUID).ContainerGatewayTunnel(ctx, options)
+}
+
 func (conn *Conn) ContainerRequestList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerRequestList, error) {
        return conn.generated_ContainerRequestList(ctx, options)
 }
index b0ec4293a38acfdf6a349db48fff96c7bf3f7a1a..4c49c007eb2b2233bed13dfa9e654a1a018a6eee 100644 (file)
@@ -1245,6 +1245,8 @@ func (s *IntegrationSuite) runContainer(c *check.C, clusterID string, token stri
                        time.Sleep(time.Second / 2)
                }
        }
+       c.Logf("cr.CumulativeCost == %f", cr.CumulativeCost)
+       c.Check(cr.CumulativeCost, check.Not(check.Equals), 0.0)
        if expectExitCode >= 0 {
                c.Check(ctr.State, check.Equals, arvados.ContainerStateComplete)
                c.Check(ctr.ExitCode, check.Equals, expectExitCode)
index 104cfe28f5e0cccfb9cf785955229b4f3b297fdf..a36822ad6b1f5df1f73ffbc3536d76a7215f1817 100644 (file)
@@ -11,6 +11,7 @@ import (
        "net/http"
        "os"
        "strings"
+       "sync"
        "time"
 
        "git.arvados.org/arvados.git/lib/controller/railsproxy"
@@ -18,6 +19,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "github.com/hashicorp/yamux"
        "github.com/sirupsen/logrus"
 )
 
@@ -31,6 +33,8 @@ type Conn struct {
        lastVocabularyRefreshCheck time.Time
        lastVocabularyError        error
        loginController
+       gwTunnels     map[string]*yamux.Session
+       gwTunnelsLock sync.Mutex
 }
 
 func NewConn(cluster *arvados.Cluster) *Conn {
index 3b40eccaff68c138e498b9ec497f5f704e249f64..77c5182e9cd924460b81a39770f647baefb3af19 100644 (file)
@@ -6,21 +6,34 @@ package localdb
 
 import (
        "bufio"
+       "bytes"
        "context"
        "crypto/hmac"
        "crypto/sha256"
+       "crypto/subtle"
        "crypto/tls"
        "crypto/x509"
        "errors"
        "fmt"
+       "io"
+       "io/ioutil"
+       "net"
        "net/http"
        "net/url"
        "strings"
 
+       "git.arvados.org/arvados.git/lib/controller/rpc"
+       "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/auth"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "github.com/hashicorp/yamux"
+)
+
+var (
+       forceProxyForTest       = false
+       forceInternalURLForTest *arvados.URL
 )
 
 // ContainerSSH returns a connection to the SSH server in the
@@ -29,51 +42,112 @@ import (
 //
 // If the returned error is nil, the caller is responsible for closing
 // sshconn.Conn.
-func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOptions) (sshconn arvados.ConnectionResponse, err error) {
        user, err := conn.railsProxy.UserGetCurrent(ctx, arvados.GetOptions{})
        if err != nil {
-               return
+               return sshconn, err
        }
        ctr, err := conn.railsProxy.ContainerGet(ctx, arvados.GetOptions{UUID: opts.UUID})
        if err != nil {
-               return
+               return sshconn, err
        }
        ctxRoot := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{conn.cluster.SystemRootToken}})
        if !user.IsAdmin || !conn.cluster.Containers.ShellAccess.Admin {
                if !conn.cluster.Containers.ShellAccess.User {
-                       err = httpserver.ErrorWithStatus(errors.New("shell access is disabled in config"), http.StatusServiceUnavailable)
-                       return
+                       return sshconn, httpserver.ErrorWithStatus(errors.New("shell access is disabled in config"), http.StatusServiceUnavailable)
                }
-               var crs arvados.ContainerRequestList
-               crs, err = conn.railsProxy.ContainerRequestList(ctxRoot, arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"container_uuid", "=", opts.UUID}}})
+               crs, err := conn.railsProxy.ContainerRequestList(ctxRoot, arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"container_uuid", "=", opts.UUID}}})
                if err != nil {
-                       return
+                       return sshconn, err
                }
                for _, cr := range crs.Items {
                        if cr.ModifiedByUserUUID != user.UUID {
-                               err = httpserver.ErrorWithStatus(errors.New("permission denied: container is associated with requests submitted by other users"), http.StatusForbidden)
-                               return
+                               return sshconn, httpserver.ErrorWithStatus(errors.New("permission denied: container is associated with requests submitted by other users"), http.StatusForbidden)
                        }
                }
                if crs.ItemsAvailable != len(crs.Items) {
-                       err = httpserver.ErrorWithStatus(errors.New("incomplete response while checking permission"), http.StatusInternalServerError)
-                       return
+                       return sshconn, httpserver.ErrorWithStatus(errors.New("incomplete response while checking permission"), http.StatusInternalServerError)
                }
        }
 
-       switch ctr.State {
-       case arvados.ContainerStateQueued, arvados.ContainerStateLocked:
-               err = httpserver.ErrorWithStatus(fmt.Errorf("container is not running yet (state is %q)", ctr.State), http.StatusServiceUnavailable)
-               return
-       case arvados.ContainerStateRunning:
-               if ctr.GatewayAddress == "" {
-                       err = httpserver.ErrorWithStatus(errors.New("container is running but gateway is not available -- installation problem or feature not supported"), http.StatusServiceUnavailable)
-                       return
+       conn.gwTunnelsLock.Lock()
+       tunnel := conn.gwTunnels[opts.UUID]
+       conn.gwTunnelsLock.Unlock()
+
+       if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked {
+               return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("container is not running yet (state is %q)", ctr.State), http.StatusServiceUnavailable)
+       } else if ctr.State != arvados.ContainerStateRunning {
+               return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("container has ended (state is %q)", ctr.State), http.StatusGone)
+       }
+
+       // targetHost is the value we'll use in the Host header in our
+       // "Upgrade: ssh" http request. It's just a placeholder
+       // "localhost", unless we decide to connect directly, in which
+       // case we'll set it to the gateway's external ip:host. (The
+       // gateway doesn't even look at it, but we might as well.)
+       targetHost := "localhost"
+       myURL, _ := service.URLFromContext(ctx)
+
+       var rawconn net.Conn
+       if host, _, splitErr := net.SplitHostPort(ctr.GatewayAddress); splitErr == nil && host != "" && host != "127.0.0.1" {
+               // If crunch-run provided a GatewayAddress like
+               // "ipaddr:port", that means "ipaddr" is one of the
+               // external interfaces where the gateway is
+               // listening. In that case, it's the most
+               // reliable/direct option, so we use it even if a
+               // tunnel might also be available.
+               targetHost = ctr.GatewayAddress
+               rawconn, err = net.Dial("tcp", ctr.GatewayAddress)
+               if err != nil {
+                       return sshconn, httpserver.ErrorWithStatus(err, http.StatusServiceUnavailable)
                }
-       default:
-               err = httpserver.ErrorWithStatus(fmt.Errorf("container has ended (state is %q)", ctr.State), http.StatusGone)
-               return
+       } else if tunnel != nil && !(forceProxyForTest && !opts.NoForward) {
+               // If we can't connect directly, and the gateway has
+               // established a yamux tunnel with us, connect through
+               // the tunnel.
+               //
+               // ...except: forceProxyForTest means we are emulating
+               // a situation where the gateway has established a
+               // yamux tunnel with controller B, and the
+               // ContainerSSH request arrives at controller A. If
+               // opts.NoForward==false then we are acting as A, so
+               // we pretend not to have a tunnel, and fall through
+               // to the "tunurl" case below. If opts.NoForward==true
+               // then the client is A and we are acting as B, so we
+               // connect to our tunnel.
+               rawconn, err = tunnel.Open()
+               if err != nil {
+                       return sshconn, httpserver.ErrorWithStatus(err, http.StatusServiceUnavailable)
+               }
+       } else if ctr.GatewayAddress == "" {
+               return sshconn, httpserver.ErrorWithStatus(errors.New("container is running but gateway is not available"), http.StatusServiceUnavailable)
+       } else if tunurl := strings.TrimPrefix(ctr.GatewayAddress, "tunnel "); tunurl != ctr.GatewayAddress &&
+               tunurl != "" &&
+               tunurl != myURL.String() &&
+               !opts.NoForward {
+               // If crunch-run provided a GatewayAddress like
+               // "tunnel https://10.0.0.10:1010/", that means the
+               // gateway has established a yamux tunnel with the
+               // controller process at the indicated InternalURL
+               // (which isn't us, otherwise we would have had
+               // "tunnel != nil" above). We need to proxy through to
+               // the other controller process in order to use the
+               // tunnel.
+               for u := range conn.cluster.Services.Controller.InternalURLs {
+                       if u.String() == tunurl {
+                               ctxlog.FromContext(ctx).Debugf("proxying ContainerSSH request to other controller at %s", u)
+                               u := url.URL(u)
+                               arpc := rpc.NewConn(conn.cluster.ClusterID, &u, conn.cluster.TLS.Insecure, rpc.PassthroughTokenProvider)
+                               opts.NoForward = true
+                               return arpc.ContainerSSH(ctx, opts)
+                       }
+               }
+               ctxlog.FromContext(ctx).Warnf("container gateway provided a tunnel endpoint %s that is not one of Services.Controller.InternalURLs", tunurl)
+               return sshconn, httpserver.ErrorWithStatus(errors.New("container gateway is running but tunnel endpoint is invalid"), http.StatusServiceUnavailable)
+       } else {
+               return sshconn, httpserver.ErrorWithStatus(errors.New("container gateway is running but tunnel is down"), http.StatusServiceUnavailable)
        }
+
        // crunch-run uses a self-signed / unverifiable TLS
        // certificate, so we use the following scheme to ensure we're
        // not talking to a MITM.
@@ -93,7 +167,7 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
        // X-Arvados-Authorization-Response header, proving that the
        // server knows ctrKey.
        var requestAuth, respondAuth string
-       netconn, err := tls.Dial("tcp", ctr.GatewayAddress, &tls.Config{
+       tlsconn := tls.Client(rawconn, &tls.Config{
                InsecureSkipVerify: true,
                VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
                        if len(rawCerts) == 0 {
@@ -111,47 +185,57 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
                        return nil
                },
        })
+       err = tlsconn.HandshakeContext(ctx)
        if err != nil {
-               err = httpserver.ErrorWithStatus(err, http.StatusBadGateway)
-               return
+               return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("TLS handshake failed: %w", err), http.StatusBadGateway)
        }
        if respondAuth == "" {
-               err = httpserver.ErrorWithStatus(errors.New("BUG: no respondAuth"), http.StatusInternalServerError)
-               return
+               tlsconn.Close()
+               return sshconn, httpserver.ErrorWithStatus(errors.New("BUG: no respondAuth"), http.StatusInternalServerError)
        }
-       bufr := bufio.NewReader(netconn)
-       bufw := bufio.NewWriter(netconn)
+       bufr := bufio.NewReader(tlsconn)
+       bufw := bufio.NewWriter(tlsconn)
 
        u := url.URL{
                Scheme: "http",
-               Host:   ctr.GatewayAddress,
+               Host:   targetHost,
                Path:   "/ssh",
        }
-       bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
+       postform := url.Values{
+               "uuid":           {opts.UUID},
+               "detach_keys":    {opts.DetachKeys},
+               "login_username": {opts.LoginUsername},
+               "no_forward":     {fmt.Sprintf("%v", opts.NoForward)},
+       }
+       postdata := postform.Encode()
+       bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n")
        bufw.WriteString("Host: " + u.Host + "\r\n")
        bufw.WriteString("Upgrade: ssh\r\n")
-       bufw.WriteString("X-Arvados-Target-Uuid: " + opts.UUID + "\r\n")
        bufw.WriteString("X-Arvados-Authorization: " + requestAuth + "\r\n")
-       bufw.WriteString("X-Arvados-Detach-Keys: " + opts.DetachKeys + "\r\n")
-       bufw.WriteString("X-Arvados-Login-Username: " + opts.LoginUsername + "\r\n")
+       bufw.WriteString("Content-Type: application/x-www-form-urlencoded\r\n")
+       fmt.Fprintf(bufw, "Content-Length: %d\r\n", len(postdata))
        bufw.WriteString("\r\n")
+       bufw.WriteString(postdata)
        bufw.Flush()
-       resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
+       resp, err := http.ReadResponse(bufr, &http.Request{Method: "POST"})
        if err != nil {
-               err = httpserver.ErrorWithStatus(fmt.Errorf("error reading http response from gateway: %w", err), http.StatusBadGateway)
-               netconn.Close()
-               return
+               tlsconn.Close()
+               return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("error reading http response from gateway: %w", err), http.StatusBadGateway)
        }
-       if resp.Header.Get("X-Arvados-Authorization-Response") != respondAuth {
-               err = httpserver.ErrorWithStatus(errors.New("bad X-Arvados-Authorization-Response header"), http.StatusBadGateway)
-               netconn.Close()
-               return
+       defer resp.Body.Close()
+       if resp.StatusCode != http.StatusSwitchingProtocols {
+               body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 1000))
+               tlsconn.Close()
+               return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("unexpected status %s %q", resp.Status, body), http.StatusBadGateway)
        }
        if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
                strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
-               err = httpserver.ErrorWithStatus(errors.New("bad upgrade"), http.StatusBadGateway)
-               netconn.Close()
-               return
+               tlsconn.Close()
+               return sshconn, httpserver.ErrorWithStatus(errors.New("bad upgrade"), http.StatusBadGateway)
+       }
+       if resp.Header.Get("X-Arvados-Authorization-Response") != respondAuth {
+               tlsconn.Close()
+               return sshconn, httpserver.ErrorWithStatus(errors.New("bad X-Arvados-Authorization-Response header"), http.StatusBadGateway)
        }
 
        if !ctr.InteractiveSessionStarted {
@@ -162,13 +246,65 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
                        },
                })
                if err != nil {
-                       netconn.Close()
-                       return
+                       tlsconn.Close()
+                       return sshconn, httpserver.ErrorWithStatus(err, http.StatusInternalServerError)
                }
        }
 
-       sshconn.Conn = netconn
+       sshconn.Conn = tlsconn
        sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
        sshconn.Logger = ctxlog.FromContext(ctx)
+       sshconn.Header = http.Header{"Upgrade": {"ssh"}}
+       return sshconn, nil
+}
+
+// ContainerGatewayTunnel sets up a tunnel enabling us (controller) to
+// connect to the caller's (crunch-run's) gateway server.
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, opts arvados.ContainerGatewayTunnelOptions) (resp arvados.ConnectionResponse, err error) {
+       h := hmac.New(sha256.New, []byte(conn.cluster.SystemRootToken))
+       fmt.Fprint(h, opts.UUID)
+       authSecret := fmt.Sprintf("%x", h.Sum(nil))
+       if subtle.ConstantTimeCompare([]byte(authSecret), []byte(opts.AuthSecret)) != 1 {
+               ctxlog.FromContext(ctx).Info("received incorrect auth_secret")
+               return resp, httpserver.ErrorWithStatus(errors.New("authentication error"), http.StatusUnauthorized)
+       }
+
+       muxconn, clientconn := net.Pipe()
+       tunnel, err := yamux.Server(muxconn, nil)
+       if err != nil {
+               clientconn.Close()
+               return resp, httpserver.ErrorWithStatus(err, http.StatusInternalServerError)
+       }
+
+       conn.gwTunnelsLock.Lock()
+       if conn.gwTunnels == nil {
+               conn.gwTunnels = map[string]*yamux.Session{opts.UUID: tunnel}
+       } else {
+               conn.gwTunnels[opts.UUID] = tunnel
+       }
+       conn.gwTunnelsLock.Unlock()
+
+       go func() {
+               <-tunnel.CloseChan()
+               conn.gwTunnelsLock.Lock()
+               if conn.gwTunnels[opts.UUID] == tunnel {
+                       delete(conn.gwTunnels, opts.UUID)
+               }
+               conn.gwTunnelsLock.Unlock()
+       }()
+
+       // Assuming we're acting as the backend of an http server,
+       // lib/controller/router will call resp's ServeHTTP handler,
+       // which upgrades the incoming http connection to a raw socket
+       // and connects it to our yamux.Server through our net.Pipe().
+       resp.Conn = clientconn
+       resp.Bufrw = &bufio.ReadWriter{Reader: bufio.NewReader(&bytes.Buffer{}), Writer: bufio.NewWriter(&bytes.Buffer{})}
+       resp.Logger = ctxlog.FromContext(ctx)
+       resp.Header = http.Header{"Upgrade": {"tunnel"}}
+       if u, ok := service.URLFromContext(ctx); ok {
+               resp.Header.Set("X-Arvados-Internal-Url", u.String())
+       } else if forceInternalURLForTest != nil {
+               resp.Header.Set("X-Arvados-Internal-Url", forceInternalURLForTest.String())
+       }
        return
 }
index 271760420153481daac1f0f129a63c684591b94b..2c882c7852a87b0ed23e2821d3e5fde6ec400439 100644 (file)
@@ -12,9 +12,14 @@ import (
        "io"
        "io/ioutil"
        "net"
+       "net/http/httptest"
+       "net/url"
+       "strings"
        "time"
 
        "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/lib/controller/router"
+       "git.arvados.org/arvados.git/lib/controller/rpc"
        "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
@@ -55,12 +60,26 @@ func (s *ContainerGatewaySuite) SetUpSuite(c *check.C) {
        fmt.Fprint(h, s.ctrUUID)
        authKey := fmt.Sprintf("%x", h.Sum(nil))
 
+       rtr := router.New(s.localdb, router.Config{})
+       srv := httptest.NewUnstartedServer(rtr)
+       srv.StartTLS()
+       // the test setup doesn't use lib/service so
+       // service.URLFromContext() returns nothing -- instead, this
+       // is how we advertise our internal URL and enable
+       // proxy-to-other-controller mode,
+       forceInternalURLForTest = &arvados.URL{Scheme: "https", Host: srv.Listener.Addr().String()}
+       ac := &arvados.Client{
+               APIHost:   srv.Listener.Addr().String(),
+               AuthToken: arvadostest.Dispatch1Token,
+               Insecure:  true,
+       }
        s.gw = &crunchrun.Gateway{
                ContainerUUID: s.ctrUUID,
                AuthSecret:    authKey,
                Address:       "localhost:0",
                Log:           ctxlog.TestLogger(c),
                Target:        crunchrun.GatewayTargetStub{},
+               ArvadosClient: ac,
        }
        c.Assert(s.gw.Start(), check.IsNil)
        rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
@@ -69,18 +88,25 @@ func (s *ContainerGatewaySuite) SetUpSuite(c *check.C) {
                Attrs: map[string]interface{}{
                        "state": arvados.ContainerStateLocked}})
        c.Assert(err, check.IsNil)
-       _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+}
+
+func (s *ContainerGatewaySuite) SetUpTest(c *check.C) {
+       // clear any tunnel sessions started by previous test cases
+       s.localdb.gwTunnelsLock.Lock()
+       s.localdb.gwTunnels = nil
+       s.localdb.gwTunnelsLock.Unlock()
+
+       rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
+       _, err := s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
                UUID: s.ctrUUID,
                Attrs: map[string]interface{}{
                        "state":           arvados.ContainerStateRunning,
                        "gateway_address": s.gw.Address}})
        c.Assert(err, check.IsNil)
-}
 
-func (s *ContainerGatewaySuite) SetUpTest(c *check.C) {
        s.cluster.Containers.ShellAccess.Admin = true
        s.cluster.Containers.ShellAccess.User = true
-       _, err := arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID)
+       _, err = arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID)
        c.Check(err, check.IsNil)
 }
 
@@ -234,3 +260,136 @@ func (s *ContainerGatewaySuite) TestConnectFail(c *check.C) {
        _, err = s.localdb.ContainerSSH(ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID})
        c.Check(err, check.ErrorMatches, `.* 404 .*`)
 }
+
+func (s *ContainerGatewaySuite) TestCreateTunnel(c *check.C) {
+       // no AuthSecret
+       conn, err := s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID: s.ctrUUID,
+       })
+       c.Check(err, check.ErrorMatches, `authentication error`)
+       c.Check(conn.Conn, check.IsNil)
+
+       // bogus AuthSecret
+       conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID:       s.ctrUUID,
+               AuthSecret: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
+       })
+       c.Check(err, check.ErrorMatches, `authentication error`)
+       c.Check(conn.Conn, check.IsNil)
+
+       // good AuthSecret
+       conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID:       s.ctrUUID,
+               AuthSecret: s.gw.AuthSecret,
+       })
+       c.Check(err, check.IsNil)
+       c.Check(conn.Conn, check.NotNil)
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelWithProxyOK(c *check.C) {
+       forceProxyForTest = true
+       defer func() { forceProxyForTest = false }()
+       s.cluster.Services.Controller.InternalURLs[*forceInternalURLForTest] = arvados.ServiceInstance{}
+       defer delete(s.cluster.Services.Controller.InternalURLs, *forceInternalURLForTest)
+       s.testConnectThroughTunnel(c, "")
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelWithProxyError(c *check.C) {
+       forceProxyForTest = true
+       defer func() { forceProxyForTest = false }()
+       // forceInternalURLForTest shouldn't be used because it isn't
+       // listed in s.cluster.Services.Controller.InternalURLs
+       s.testConnectThroughTunnel(c, `.*tunnel endpoint is invalid.*`)
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelNoProxyOK(c *check.C) {
+       s.testConnectThroughTunnel(c, "")
+}
+
+func (s *ContainerGatewaySuite) testConnectThroughTunnel(c *check.C, expectErrorMatch string) {
+       rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
+       // Until the tunnel starts up, set gateway_address to a value
+       // that can't work. We want to ensure the only way we can
+       // reach the gateway is through the tunnel.
+       gwaddr := "127.0.0.1:0"
+       tungw := &crunchrun.Gateway{
+               ContainerUUID: s.ctrUUID,
+               AuthSecret:    s.gw.AuthSecret,
+               Log:           ctxlog.TestLogger(c),
+               Target:        crunchrun.GatewayTargetStub{},
+               ArvadosClient: s.gw.ArvadosClient,
+               UpdateTunnelURL: func(url string) {
+                       c.Logf("UpdateTunnelURL(%q)", url)
+                       gwaddr = "tunnel " + url
+                       s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+                               UUID: s.ctrUUID,
+                               Attrs: map[string]interface{}{
+                                       "gateway_address": gwaddr}})
+               },
+       }
+       c.Assert(tungw.Start(), check.IsNil)
+
+       // We didn't supply an external hostname in the Address field,
+       // so Start() should assign a local address.
+       host, _, err := net.SplitHostPort(tungw.Address)
+       c.Assert(err, check.IsNil)
+       c.Check(host, check.Equals, "127.0.0.1")
+
+       _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+               UUID: s.ctrUUID,
+               Attrs: map[string]interface{}{
+                       "state":           arvados.ContainerStateRunning,
+                       "gateway_address": gwaddr}})
+       c.Assert(err, check.IsNil)
+
+       for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); time.Sleep(time.Second / 2) {
+               ctr, err := s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID})
+               c.Assert(err, check.IsNil)
+               c.Check(ctr.InteractiveSessionStarted, check.Equals, false)
+               c.Logf("ctr.GatewayAddress == %s", ctr.GatewayAddress)
+               if strings.HasPrefix(ctr.GatewayAddress, "tunnel ") {
+                       break
+               }
+       }
+
+       c.Log("connecting to gateway through tunnel")
+       arpc := rpc.NewConn("", &url.URL{Scheme: "https", Host: s.gw.ArvadosClient.APIHost}, true, rpc.PassthroughTokenProvider)
+       sshconn, err := arpc.ContainerSSH(s.ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID})
+       if expectErrorMatch != "" {
+               c.Check(err, check.ErrorMatches, expectErrorMatch)
+               return
+       }
+       c.Assert(err, check.IsNil)
+       c.Assert(sshconn.Conn, check.NotNil)
+       defer sshconn.Conn.Close()
+
+       done := make(chan struct{})
+       go func() {
+               defer close(done)
+
+               // Receive text banner
+               buf := make([]byte, 12)
+               _, err := io.ReadFull(sshconn.Conn, buf)
+               c.Check(err, check.IsNil)
+               c.Check(string(buf), check.Equals, "SSH-2.0-Go\r\n")
+
+               // Send text banner
+               _, err = sshconn.Conn.Write([]byte("SSH-2.0-Fake\r\n"))
+               c.Check(err, check.IsNil)
+
+               // Receive binary
+               _, err = io.ReadFull(sshconn.Conn, buf[:4])
+               c.Check(err, check.IsNil)
+
+               // If we can get this far into an SSH handshake...
+               c.Logf("was able to read %x -- success, tunnel is working", buf[:4])
+       }()
+       select {
+       case <-done:
+       case <-time.After(time.Second):
+               c.Fail()
+       }
+       ctr, err := s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID})
+       c.Check(err, check.IsNil)
+       c.Check(ctr.InteractiveSessionStarted, check.Equals, true)
+}
index 13dfcac16abb0bb27c7b1f3d50d024436453f97c..47b8cb47112ad5990d2f80dd23c72cf98fb85a70 100644 (file)
@@ -63,10 +63,13 @@ func (p *proxy) Do(
                        hdrOut[k] = v
                }
        }
-       xff := reqIn.RemoteAddr
-       if xffIn := reqIn.Header.Get("X-Forwarded-For"); xffIn != "" {
-               xff = xffIn + "," + xff
+       xff := ""
+       for _, xffIn := range reqIn.Header["X-Forwarded-For"] {
+               if xffIn != "" {
+                       xff += xffIn + ","
+               }
        }
+       xff += reqIn.RemoteAddr
        hdrOut.Set("X-Forwarded-For", xff)
        if hdrOut.Get("X-Forwarded-Proto") == "" {
                hdrOut.Set("X-Forwarded-Proto", reqIn.URL.Scheme)
index 06141b1033e3f0034e003eab07da11c17153496e..31f2e1d7baf5098a377ffe9d1acd7b737958231d 100644 (file)
@@ -176,6 +176,7 @@ var boolParams = map[string]bool{
        "bypass_federation":       true,
        "recursive":               true,
        "exclude_home_project":    true,
+       "no_forward":              true,
 }
 
 func stringToBool(s string) bool {
index 586ea8e676ec9ae2a08a7f1855f7f17d8ef5754a..80d5e929850cd18df389daeddb18eb4b12387a38 100644 (file)
@@ -244,6 +244,24 @@ func (rtr *router) addRoutes() {
                                return rtr.backend.ContainerSSH(ctx, *opts.(*arvados.ContainerSSHOptions))
                        },
                },
+               {
+                       // arvados-client built before commit
+                       // bdc29d3129f6d75aa9ce0a24ffb849a272b06f08
+                       // used GET with params in headers instead of
+                       // POST form
+                       arvados.APIEndpoint{"GET", "arvados/v1/connect/{uuid}/ssh", ""},
+                       func() interface{} { return &arvados.ContainerSSHOptions{} },
+                       func(ctx context.Context, opts interface{}) (interface{}, error) {
+                               return nil, httpError(http.StatusGone, fmt.Errorf("API endpoint is obsolete -- please upgrade your arvados-client program"))
+                       },
+               },
+               {
+                       arvados.EndpointContainerGatewayTunnel,
+                       func() interface{} { return &arvados.ContainerGatewayTunnelOptions{} },
+                       func(ctx context.Context, opts interface{}) (interface{}, error) {
+                               return rtr.backend.ContainerGatewayTunnel(ctx, *opts.(*arvados.ContainerGatewayTunnelOptions))
+                       },
+               },
                {
                        arvados.EndpointGroupCreate,
                        func() interface{} { return &arvados.CreateOptions{} },
index 1148068d70896c2ff4b16074c731fedf23bb5bbb..0e532f23c070d8b5c64a15bd8bef46494702ae5a 100644 (file)
@@ -23,6 +23,7 @@ import (
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
 )
 
@@ -331,7 +332,36 @@ func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOption
 // ContainerSSH returns a connection to the out-of-band SSH server for
 // a running container. If the returned error is nil, the caller is
 // responsible for closing sshconn.Conn.
-func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ConnectionResponse, err error) {
+       u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
+       if err != nil {
+               err = fmt.Errorf("url.Parse: %w", err)
+               return
+       }
+       return conn.socket(ctx, u, "ssh", url.Values{
+               "detach_keys":    {options.DetachKeys},
+               "login_username": {options.LoginUsername},
+               "no_forward":     {fmt.Sprintf("%v", options.NoForward)},
+       })
+}
+
+// ContainerGatewayTunnel returns a connection to a yamux session on
+// the controller. The caller should connect the returned resp.Conn to
+// a client-side yamux session.
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (tunnelconn arvados.ConnectionResponse, err error) {
+       u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", options.UUID, -1))
+       if err != nil {
+               err = fmt.Errorf("url.Parse: %w", err)
+               return
+       }
+       return conn.socket(ctx, u, "tunnel", url.Values{
+               "auth_secret": {options.AuthSecret},
+       })
+}
+
+// socket sets up a socket using the specified API endpoint and
+// upgrade header.
+func (conn *Conn) socket(ctx context.Context, u *url.URL, upgradeHeader string, postform url.Values) (connresp arvados.ConnectionResponse, err error) {
        addr := conn.baseURL.Host
        if strings.Index(addr, ":") < 1 || (strings.Contains(addr, "::") && addr[0] != '[') {
                // hostname or ::1 or 1::1
@@ -343,8 +373,7 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH
        }
        netconn, err := tls.Dial("tcp", addr, &tls.Config{InsecureSkipVerify: insecure})
        if err != nil {
-               err = fmt.Errorf("tls.Dial: %w", err)
-               return
+               return connresp, fmt.Errorf("tls.Dial: %w", err)
        }
        defer func() {
                if err != nil {
@@ -354,36 +383,30 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH
        bufr := bufio.NewReader(netconn)
        bufw := bufio.NewWriter(netconn)
 
-       u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
-       if err != nil {
-               err = fmt.Errorf("tls.Dial: %w", err)
-               return
-       }
-       u.RawQuery = url.Values{
-               "detach_keys":    {options.DetachKeys},
-               "login_username": {options.LoginUsername},
-       }.Encode()
        tokens, err := conn.tokenProvider(ctx)
        if err != nil {
-               return
+               return connresp, err
        } else if len(tokens) < 1 {
-               err = httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
-               return
+               return connresp, httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
        }
-       bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
+       postdata := postform.Encode()
+       bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n")
        bufw.WriteString("Authorization: Bearer " + tokens[0] + "\r\n")
        bufw.WriteString("Host: " + u.Host + "\r\n")
-       bufw.WriteString("Upgrade: ssh\r\n")
+       bufw.WriteString("Upgrade: " + upgradeHeader + "\r\n")
+       bufw.WriteString("Content-Type: application/x-www-form-urlencoded\r\n")
+       fmt.Fprintf(bufw, "Content-Length: %d\r\n", len(postdata))
        bufw.WriteString("\r\n")
+       bufw.WriteString(postdata)
        bufw.Flush()
-       resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
+       resp, err := http.ReadResponse(bufr, &http.Request{Method: "POST"})
        if err != nil {
-               err = fmt.Errorf("http.ReadResponse: %w", err)
-               return
+               return connresp, fmt.Errorf("http.ReadResponse: %w", err)
        }
+       defer resp.Body.Close()
        if resp.StatusCode != http.StatusSwitchingProtocols {
-               defer resp.Body.Close()
-               body, _ := ioutil.ReadAll(resp.Body)
+               ctxlog.FromContext(ctx).Infof("rpc.Conn.socket: server %s did not switch protocols, got status %s", u.String(), resp.Status)
+               body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 10000))
                var message string
                var errDoc httpserver.ErrorResponse
                if err := json.Unmarshal(body, &errDoc); err == nil {
@@ -391,17 +414,16 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH
                } else {
                        message = fmt.Sprintf("%q", body)
                }
-               err = fmt.Errorf("server did not provide a tunnel: %s (HTTP %d)", message, resp.StatusCode)
-               return
+               return connresp, fmt.Errorf("server did not provide a tunnel: %s: %s", resp.Status, message)
        }
-       if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
+       if strings.ToLower(resp.Header.Get("Upgrade")) != upgradeHeader ||
                strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
-               err = fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection"))
-               return
+               return connresp, fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection"))
        }
-       sshconn.Conn = netconn
-       sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
-       return
+       connresp.Conn = netconn
+       connresp.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
+       connresp.Header = resp.Header
+       return connresp, nil
 }
 
 func (conn *Conn) ContainerRequestCreate(ctx context.Context, options arvados.CreateOptions) (arvados.ContainerRequest, error) {
index 01457015e16f1870bf4adf4785b8f9c08cec10d5..3cb93fc746a78d01afd6a40023e00903a5c3a846 100644 (file)
@@ -14,16 +14,22 @@ import (
        "io"
        "net"
        "net/http"
+       "net/url"
        "os"
        "os/exec"
        "sync"
        "syscall"
+       "time"
 
+       "git.arvados.org/arvados.git/lib/controller/rpc"
        "git.arvados.org/arvados.git/lib/selfsigned"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/auth"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "github.com/creack/pty"
        "github.com/google/shlex"
+       "github.com/hashicorp/yamux"
        "golang.org/x/crypto/ssh"
        "golang.org/x/net/context"
 )
@@ -45,12 +51,32 @@ func (GatewayTargetStub) InjectCommand(ctx context.Context, detachKeys, username
 
 type Gateway struct {
        ContainerUUID string
-       Address       string // listen host:port; if port=0, Start() will change it to the selected port
-       AuthSecret    string
-       Target        GatewayTarget
-       Log           interface {
+       // Caller should set Address to "", or "host:0" or "host:port"
+       // where host is a known external IP address; port is a
+       // desired port number to listen on; and ":0" chooses an
+       // available dynamic port.
+       //
+       // If Address is "", Start() listens only on the loopback
+       // interface (and changes Address to "127.0.0.1:port").
+       // Otherwise it listens on all interfaces.
+       //
+       // If Address is "host:0", Start() updates Address to
+       // "host:port".
+       Address    string
+       AuthSecret string
+       Target     GatewayTarget
+       Log        interface {
                Printf(fmt string, args ...interface{})
        }
+       // If non-nil, set up a ContainerGatewayTunnel, so that the
+       // controller can connect to us even if our external IP
+       // address is unknown or not routable from controller.
+       ArvadosClient *arvados.Client
+
+       // When a tunnel is connected or reconnected, this func (if
+       // not nil) will be called with the InternalURL of the
+       // controller process at the other end of the tunnel.
+       UpdateTunnelURL func(url string)
 
        sshConfig   ssh.ServerConfig
        requestAuth string
@@ -99,7 +125,22 @@ func (gw *Gateway) Start() error {
        // from arvados-controller, and PORT is either the desired
        // port where we should run our gateway server, or "0" if we
        // should choose an available port.
-       host, port, err := net.SplitHostPort(gw.Address)
+       extAddr := gw.Address
+       // Generally we can't know which local interface corresponds
+       // to an externally reachable IP address, so if we expect to
+       // be reachable by external hosts, we listen on all
+       // interfaces.
+       listenHost := ""
+       if extAddr == "" {
+               // If the dispatcher doesn't tell us our external IP
+               // address, controller will only be able to connect
+               // through the tunnel (see runTunnel), so our gateway
+               // server only needs to listen on the loopback
+               // interface.
+               extAddr = "127.0.0.1:0"
+               listenHost = "127.0.0.1"
+       }
+       extHost, extPort, err := net.SplitHostPort(extAddr)
        if err != nil {
                return err
        }
@@ -121,26 +162,104 @@ func (gw *Gateway) Start() error {
                                Certificates: []tls.Certificate{cert},
                        },
                },
-               Addr: ":" + port,
+               Addr: net.JoinHostPort(listenHost, extPort),
        }
        err = srv.Start()
        if err != nil {
                return err
        }
-       // Get the port number we are listening on (the port might be
+       go func() {
+               err := srv.Wait()
+               gw.Log.Printf("gateway server stopped: %s", err)
+       }()
+       // Get the port number we are listening on (extPort might be
        // "0" or a port name, in which case this will be different).
-       _, port, err = net.SplitHostPort(srv.Addr)
+       _, listenPort, err := net.SplitHostPort(srv.Addr)
        if err != nil {
                return err
        }
-       // When changing state to Running, we will set
-       // gateway_address to "HOST:PORT" where HOST is our
-       // external hostname/IP as provided by arvados-dispatch-cloud,
-       // and PORT is the port number we ended up listening on.
-       gw.Address = net.JoinHostPort(host, port)
+       // When changing state to Running, the caller will want to set
+       // gateway_address to a "HOST:PORT" that, if controller
+       // connects to it, will reach this gateway server.
+       //
+       // The most likely thing to work is: HOST is our external
+       // hostname/IP as provided by the caller
+       // (arvados-dispatch-cloud) or 127.0.0.1 to indicate
+       // non-tunnel connections aren't available; and PORT is the
+       // port number we are listening on.
+       gw.Address = net.JoinHostPort(extHost, listenPort)
+       gw.Log.Printf("gateway server listening at %s", gw.Address)
+       if gw.ArvadosClient != nil {
+               go gw.maintainTunnel(gw.Address)
+       }
        return nil
 }
 
+func (gw *Gateway) maintainTunnel(addr string) {
+       for ; ; time.Sleep(5 * time.Second) {
+               err := gw.runTunnel(addr)
+               gw.Log.Printf("runTunnel: %s", err)
+       }
+}
+
+// runTunnel connects to controller and sets up a tunnel through
+// which controller can connect to the gateway server at the given
+// addr.
+func (gw *Gateway) runTunnel(addr string) error {
+       ctx := auth.NewContext(context.Background(), auth.NewCredentials(gw.ArvadosClient.AuthToken))
+       arpc := rpc.NewConn("", &url.URL{Scheme: "https", Host: gw.ArvadosClient.APIHost}, gw.ArvadosClient.Insecure, rpc.PassthroughTokenProvider)
+       tun, err := arpc.ContainerGatewayTunnel(ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID:       gw.ContainerUUID,
+               AuthSecret: gw.AuthSecret,
+       })
+       if err != nil {
+               return fmt.Errorf("error creating gateway tunnel: %s", err)
+       }
+       mux, err := yamux.Client(tun.Conn, nil)
+       if err != nil {
+               return fmt.Errorf("error setting up mux client end: %s", err)
+       }
+       if url := tun.Header.Get("X-Arvados-Internal-Url"); url != "" && gw.UpdateTunnelURL != nil {
+               gw.UpdateTunnelURL(url)
+       }
+       for {
+               muxconn, err := mux.AcceptStream()
+               if err != nil {
+                       return err
+               }
+               gw.Log.Printf("tunnel connection %d started", muxconn.StreamID())
+               go func() {
+                       defer muxconn.Close()
+                       gwconn, err := net.Dial("tcp", addr)
+                       if err != nil {
+                               gw.Log.Printf("tunnel connection %d: error connecting to %s: %s", muxconn.StreamID(), addr, err)
+                               return
+                       }
+                       defer gwconn.Close()
+                       var wg sync.WaitGroup
+                       wg.Add(2)
+                       go func() {
+                               defer wg.Done()
+                               _, err := io.Copy(gwconn, muxconn)
+                               if err != nil {
+                                       gw.Log.Printf("tunnel connection %d: mux end: %s", muxconn.StreamID(), err)
+                               }
+                               gwconn.Close()
+                       }()
+                       go func() {
+                               defer wg.Done()
+                               _, err := io.Copy(muxconn, gwconn)
+                               if err != nil {
+                                       gw.Log.Printf("tunnel connection %d: gateway end: %s", muxconn.StreamID(), err)
+                               }
+                               muxconn.Close()
+                       }()
+                       wg.Wait()
+                       gw.Log.Printf("tunnel connection %d finished", muxconn.StreamID())
+               }()
+       }
+}
+
 // handleSSH connects to an SSH server that allows the caller to run
 // interactive commands as root (or any other desired user) inside the
 // container. The tunnel itself can only be created by an
@@ -166,11 +285,12 @@ func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) {
        // In future we'll handle browser traffic too, but for now the
        // only traffic we expect is an SSH tunnel from
        // (*lib/controller/localdb.Conn)ContainerSSH()
-       if req.Method != "GET" || req.Header.Get("Upgrade") != "ssh" {
+       if req.Method != "POST" || req.Header.Get("Upgrade") != "ssh" {
                http.Error(w, "path not found", http.StatusNotFound)
                return
        }
-       if want := req.Header.Get("X-Arvados-Target-Uuid"); want != gw.ContainerUUID {
+       req.ParseForm()
+       if want := req.Form.Get("uuid"); want != gw.ContainerUUID {
                http.Error(w, fmt.Sprintf("misdirected request: meant for %q but received by crunch-run %q", want, gw.ContainerUUID), http.StatusBadGateway)
                return
        }
@@ -178,8 +298,8 @@ func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) {
                http.Error(w, "bad X-Arvados-Authorization header", http.StatusUnauthorized)
                return
        }
-       detachKeys := req.Header.Get("X-Arvados-Detach-Keys")
-       username := req.Header.Get("X-Arvados-Login-Username")
+       detachKeys := req.Form.Get("detach_keys")
+       username := req.Form.Get("login_username")
        if username == "" {
                username = "root"
        }
@@ -204,7 +324,9 @@ func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) {
        ctx := req.Context()
 
        conn, newchans, reqs, err := ssh.NewServerConn(netconn, &gw.sshConfig)
-       if err != nil {
+       if err == io.EOF {
+               return
+       } else if err != nil {
                gw.Log.Printf("ssh.NewServerConn: %s", err)
                return
        }
@@ -278,9 +400,11 @@ func (gw *Gateway) handleDirectTCPIP(ctx context.Context, newch ssh.NewChannel)
 func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, detachKeys, username string) {
        ch, reqs, err := newch.Accept()
        if err != nil {
-               gw.Log.Printf("accept session channel: %s", err)
+               gw.Log.Printf("error accepting session channel: %s", err)
                return
        }
+       defer ch.Close()
+
        var pty0, tty0 *os.File
        // Where to send errors/messages for the client to see
        logw := io.Writer(ch.Stderr())
@@ -289,10 +413,28 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
        eol := "\n"
        // Env vars to add to child process
        termEnv := []string(nil)
-       for req := range reqs {
+
+       started := 0
+       wantClose := make(chan struct{})
+       for {
+               var req *ssh.Request
+               select {
+               case r, ok := <-reqs:
+                       if !ok {
+                               return
+                       }
+                       req = r
+               case <-wantClose:
+                       return
+               }
                ok := false
                switch req.Type {
                case "shell", "exec":
+                       if started++; started != 1 {
+                               // RFC 4254 6.5: "Only one of these
+                               // requests can succeed per channel."
+                               break
+                       }
                        ok = true
                        var payload struct {
                                Command string
@@ -312,7 +454,7 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
                                }
                                defer func() {
                                        ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
-                                       ch.Close()
+                                       close(wantClose)
                                }()
 
                                cmd, err := gw.Target.InjectCommand(ctx, detachKeys, username, tty0 != nil, execargs)
@@ -322,20 +464,39 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
                                        resp.Status = 1
                                        return
                                }
-                               cmd.Stdin = ch
-                               cmd.Stdout = ch
-                               cmd.Stderr = ch.Stderr()
                                if tty0 != nil {
                                        cmd.Stdin = tty0
                                        cmd.Stdout = tty0
                                        cmd.Stderr = tty0
-                                       var wg sync.WaitGroup
-                                       defer wg.Wait()
-                                       wg.Add(2)
-                                       go func() { io.Copy(ch, pty0); wg.Done() }()
-                                       go func() { io.Copy(pty0, ch); wg.Done() }()
+                                       go io.Copy(ch, pty0)
+                                       go io.Copy(pty0, ch)
                                        // Send our own debug messages to tty as well.
                                        logw = tty0
+                               } else {
+                                       // StdinPipe may seem
+                                       // superfluous here, but it's
+                                       // not: it causes cmd.Run() to
+                                       // return when the subprocess
+                                       // exits. Without it, Run()
+                                       // waits for stdin to close,
+                                       // which causes "ssh ... echo
+                                       // ok" (with the client's
+                                       // stdin connected to a
+                                       // terminal or something) to
+                                       // hang.
+                                       stdin, err := cmd.StdinPipe()
+                                       if err != nil {
+                                               fmt.Fprintln(ch.Stderr(), err)
+                                               ch.CloseWrite()
+                                               resp.Status = 1
+                                               return
+                                       }
+                                       go func() {
+                                               io.Copy(stdin, ch)
+                                               stdin.Close()
+                                       }()
+                                       cmd.Stdout = ch
+                                       cmd.Stderr = ch.Stderr()
                                }
                                cmd.SysProcAttr = &syscall.SysProcAttr{
                                        Setctty: tty0 != nil,
@@ -403,7 +564,7 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
                        // would be a gaping security
                        // hole).
                default:
-                       // fmt.Fprintf(logw, "declining %q req"+eol, req.Type)
+                       // fmt.Fprintf(logw, "declined request %q on ssh channel"+eol, req.Type)
                }
                if req.WantReply {
                        req.Reply(ok, nil)
index 68181395fadcbafba4227b0d7cc16eb4b2f8624e..ee9115d8d809903be17cbaa10dc4010d1b7d87dc 100644 (file)
@@ -140,6 +140,7 @@ type ContainerRunner struct {
        MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
        finalState    string
        parentTemp    string
+       costStartTime time.Time
 
        keepstoreLogger  io.WriteCloser
        keepstoreLogbuf  *bufThenWrite
@@ -1457,6 +1458,10 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
        if runner.finalState == "Complete" && runner.OutputPDH != nil {
                update["output"] = *runner.OutputPDH
        }
+       var it arvados.InstanceType
+       if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+               update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds()
+       }
        return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
 }
 
@@ -1489,6 +1494,7 @@ func (runner *ContainerRunner) Run() (err error) {
        runner.CrunchLog.Printf("Using FUSE mount: %s", v)
        runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
        runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
+       runner.costStartTime = time.Now()
 
        hostname, hosterr := os.Hostname()
        if hosterr != nil {
@@ -1744,6 +1750,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
        brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+       version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
 
        ignoreDetachFlag := false
        if len(args) > 0 && args[0] == "-no-detach" {
@@ -1759,6 +1766,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
                return code
+       } else if *version {
+               fmt.Fprintln(stdout, prog, cmd.Version.String())
+               return 0
        } else if !*list && flags.NArg() != 1 {
                fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
                return 2
@@ -1906,11 +1916,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                // not safe to run a gateway service without an auth
                // secret
                cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
-       } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
-               // dispatcher did not tell us which external IP
-               // address to advertise --> no gateway service
-               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
        } else {
+               gwListen := os.Getenv("GatewayAddress")
                cr.gateway = Gateway{
                        Address:       gwListen,
                        AuthSecret:    gwAuthSecret,
@@ -1918,6 +1925,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        Target:        cr.executor,
                        Log:           cr.CrunchLog,
                }
+               if gwListen == "" {
+                       // Direct connection won't work, so we use the
+                       // gateway_address field to indicate the
+                       // internalURL of the controller process that
+                       // has the current tunnel connection.
+                       cr.gateway.ArvadosClient = cr.dispatcherClient
+                       cr.gateway.UpdateTunnelURL = func(url string) {
+                               cr.gateway.Address = "tunnel " + url
+                               cr.DispatcherArvClient.Update("containers", containerUUID,
+                                       arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+                       }
+               }
                err = cr.gateway.Start()
                if err != nil {
                        log.Printf("error starting gateway server: %s", err)
index ea6e610d8b7921f97532461cc75dcea4427318f6..e757f579fe957eff333892c2fc7a12de9fff82e2 100644 (file)
@@ -8,12 +8,14 @@ import (
        "bytes"
        "fmt"
        "io"
+       "io/ioutil"
        "net"
        "net/http"
        "os"
        "strings"
        "time"
 
+       "git.arvados.org/arvados.git/lib/diagnostics"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "golang.org/x/net/context"
@@ -63,6 +65,21 @@ func (s *executorSuite) TestExecTrivialContainer(c *C) {
        c.Check(s.stderr.String(), Equals, "")
 }
 
+func (s *executorSuite) TestDiagnosticsImage(c *C) {
+       s.newExecutor(c)
+       imagefile := c.MkDir() + "/hello-world.tar"
+       err := ioutil.WriteFile(imagefile, diagnostics.HelloWorldDockerImage, 0777)
+       c.Assert(err, IsNil)
+       err = s.executor.LoadImage("", imagefile, arvados.Container{}, "", nil)
+       c.Assert(err, IsNil)
+
+       c.Logf("Using container runtime: %s", s.executor.Runtime())
+       s.spec.Image = "hello-world"
+       s.spec.Command = []string{"/hello"}
+       s.checkRun(c, 0)
+       c.Check(s.stdout.String(), Matches, `(?ms)\nHello from Docker!\n.*`)
+}
+
 func (s *executorSuite) TestExitStatus(c *C) {
        s.spec.Command = []string{"false"}
        s.checkRun(c, 1)
index 9860c7949727b169ccc1df66e15e4f223dc7e7cd..d569020824c22373d5098e0afd4c14d6156dd773 100644 (file)
@@ -229,13 +229,25 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
                c.Check(s.logFiles["crunch-run.txt"], Not(Matches), `(?ms).* at http://169\.254\..*`)
                c.Check(s.logFiles["stderr.txt"], Matches, `(?ms).*ARVADOS_KEEP_SERVICES=http://[\d\.]{7,}:\d+\n.*`)
        }
+}
 
+func (s *integrationSuite) TestRunTrivialContainerWithNoLocalKeepstore(c *C) {
        // Check that (1) config is loaded from $ARVADOS_CONFIG when
        // not provided on stdin and (2) if a local keepstore is not
        // started, crunch-run.txt explains why not.
        s.SetUpTest(c)
        s.stdin.Reset()
        s.testRunTrivialContainer(c)
+       c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because KeepBuffers=0 in config\n.*`)
+
+       s.SetUpTest(c)
+       s.args = []string{"-config", c.MkDir() + "/config.yaml"}
+       s.stdin.Reset()
+       buf, err := ioutil.ReadFile(os.Getenv("ARVADOS_CONFIG"))
+       c.Assert(err, IsNil)
+       err = ioutil.WriteFile(s.args[1], bytes.Replace(buf, []byte("LocalKeepBlobBuffersPerVCPU: 0"), []byte("LocalKeepBlobBuffersPerVCPU: 1"), -1), 0666)
+       c.Assert(err, IsNil)
+       s.testRunTrivialContainer(c)
        c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-00000000000000\d\) uses AccessViaHosts\n.*`)
 
        // Check that config read errors are logged
@@ -248,7 +260,7 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
        s.SetUpTest(c)
        s.args = []string{"-config", c.MkDir() + "/config-unreadable.yaml"}
        s.stdin.Reset()
-       err := ioutil.WriteFile(s.args[1], []byte{}, 0)
+       err = ioutil.WriteFile(s.args[1], []byte{}, 0)
        c.Check(err, IsNil)
        s.testRunTrivialContainer(c)
        c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* permission denied\n.*`)
index 3455d3307e71bff663aa44ea3023bf7b71efca26..799abf9da4e278bc7f2f4150e7f284c991c677c4 100644 (file)
@@ -5,8 +5,10 @@
 package diagnostics
 
 import (
+       "archive/tar"
        "bytes"
        "context"
+       _ "embed"
        "flag"
        "fmt"
        "io"
@@ -30,7 +32,7 @@ func (Command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        f := flag.NewFlagSet(prog, flag.ContinueOnError)
        f.StringVar(&diag.projectName, "project-name", "scratch area for diagnostics", "name of project to find/create in home project and use for temporary/test objects")
        f.StringVar(&diag.logLevel, "log-level", "info", "logging level (debug, info, warning, error)")
-       f.StringVar(&diag.dockerImage, "docker-image", "alpine:latest", "image to use when running a test container")
+       f.StringVar(&diag.dockerImage, "docker-image", "", "image to use when running a test container (default: use embedded hello-world image)")
        f.BoolVar(&diag.checkInternal, "internal-client", false, "check that this host is considered an \"internal\" client")
        f.BoolVar(&diag.checkExternal, "external-client", false, "check that this host is considered an \"external\" client")
        f.IntVar(&diag.priority, "priority", 500, "priority for test container (1..1000, or 0 to skip)")
@@ -55,6 +57,10 @@ func (Command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
 }
 
+// docker save hello-world > hello-world.tar
+//go:embed hello-world.tar
+var HelloWorldDockerImage []byte
+
 type diagnoser struct {
        stdout        io.Writer
        stderr        io.Writer
@@ -368,13 +374,38 @@ func (diag *diagnoser) runtests() {
                }()
        }
 
+       // Read hello-world.tar to find image ID, so we can upload it
+       // as "sha256:{...}.tar"
+       var imageSHA2 string
+       {
+               tr := tar.NewReader(bytes.NewReader(HelloWorldDockerImage))
+               for {
+                       hdr, err := tr.Next()
+                       if err == io.EOF {
+                               break
+                       }
+                       if err != nil {
+                               diag.errorf("internal error/bug: cannot read embedded docker image tar file: %s", err)
+                               return
+                       }
+                       if s := strings.TrimSuffix(hdr.Name, ".json"); len(s) == 64 && s != hdr.Name {
+                               imageSHA2 = s
+                       }
+               }
+               if imageSHA2 == "" {
+                       diag.errorf("internal error/bug: cannot find {sha256}.json file in embedded docker image tar file")
+                       return
+               }
+       }
+       tarfilename := "sha256:" + imageSHA2 + ".tar"
+
        diag.dotest(100, "uploading file via webdav", func() error {
                ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(diag.timeout))
                defer cancel()
                if collection.UUID == "" {
                        return fmt.Errorf("skipping, no test collection")
                }
-               req, err := http.NewRequestWithContext(ctx, "PUT", cluster.Services.WebDAVDownload.ExternalURL.String()+"c="+collection.UUID+"/testfile", bytes.NewBufferString("testfiledata"))
+               req, err := http.NewRequestWithContext(ctx, "PUT", cluster.Services.WebDAVDownload.ExternalURL.String()+"c="+collection.UUID+"/"+tarfilename, bytes.NewReader(HelloWorldDockerImage))
                if err != nil {
                        return fmt.Errorf("BUG? http.NewRequest: %s", err)
                }
@@ -415,11 +446,11 @@ func (diag *diagnoser) runtests() {
                fileurl      string
        }{
                {false, false, http.StatusNotFound, strings.Replace(davurl.String(), "*", "d41d8cd98f00b204e9800998ecf8427e-0", 1) + "foo"},
-               {false, false, http.StatusNotFound, strings.Replace(davurl.String(), "*", "d41d8cd98f00b204e9800998ecf8427e-0", 1) + "testfile"},
+               {false, false, http.StatusNotFound, strings.Replace(davurl.String(), "*", "d41d8cd98f00b204e9800998ecf8427e-0", 1) + tarfilename},
                {false, false, http.StatusNotFound, cluster.Services.WebDAVDownload.ExternalURL.String() + "c=d41d8cd98f00b204e9800998ecf8427e+0/_/foo"},
-               {false, false, http.StatusNotFound, cluster.Services.WebDAVDownload.ExternalURL.String() + "c=d41d8cd98f00b204e9800998ecf8427e+0/_/testfile"},
-               {true, true, http.StatusOK, strings.Replace(davurl.String(), "*", strings.Replace(collection.PortableDataHash, "+", "-", -1), 1) + "testfile"},
-               {true, false, http.StatusOK, cluster.Services.WebDAVDownload.ExternalURL.String() + "c=" + collection.UUID + "/_/testfile"},
+               {false, false, http.StatusNotFound, cluster.Services.WebDAVDownload.ExternalURL.String() + "c=d41d8cd98f00b204e9800998ecf8427e+0/_/" + tarfilename},
+               {true, true, http.StatusOK, strings.Replace(davurl.String(), "*", strings.Replace(collection.PortableDataHash, "+", "-", -1), 1) + tarfilename},
+               {true, false, http.StatusOK, cluster.Services.WebDAVDownload.ExternalURL.String() + "c=" + collection.UUID + "/_/" + tarfilename},
        } {
                diag.dotest(120+i, fmt.Sprintf("downloading from webdav (%s)", trial.fileurl), func() error {
                        if trial.needWildcard && !davWildcard {
@@ -448,8 +479,13 @@ func (diag *diagnoser) runtests() {
                        if resp.StatusCode != trial.status {
                                return fmt.Errorf("unexpected response status: %s", resp.Status)
                        }
-                       if trial.status == http.StatusOK && string(body) != "testfiledata" {
-                               return fmt.Errorf("unexpected response content: %q", body)
+                       if trial.status == http.StatusOK && !bytes.Equal(body, HelloWorldDockerImage) {
+                               excerpt := body
+                               if len(excerpt) > 128 {
+                                       excerpt = append([]byte(nil), body[:128]...)
+                                       excerpt = append(excerpt, []byte("[...]")...)
+                               }
+                               return fmt.Errorf("unexpected response content: len %d, %q", len(body), excerpt)
                        }
                        return nil
                })
@@ -555,16 +591,25 @@ func (diag *diagnoser) runtests() {
                        return fmt.Errorf("skipping, no project to work in")
                }
 
+               timestamp := time.Now().Format(time.RFC3339)
+               ctrCommand := []string{"echo", timestamp}
+               if diag.dockerImage == "" {
+                       if collection.UUID == "" {
+                               return fmt.Errorf("skipping, no test collection to use as docker image")
+                       }
+                       diag.dockerImage = collection.PortableDataHash
+                       ctrCommand = []string{"/hello"}
+               }
+
                var cr arvados.ContainerRequest
                ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(diag.timeout))
                defer cancel()
 
-               timestamp := time.Now().Format(time.RFC3339)
                err := client.RequestAndDecodeContext(ctx, &cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{"container_request": map[string]interface{}{
                        "owner_uuid":      project.UUID,
                        "name":            fmt.Sprintf("diagnostics container request %s", timestamp),
                        "container_image": diag.dockerImage,
-                       "command":         []string{"echo", timestamp},
+                       "command":         ctrCommand,
                        "use_existing":    false,
                        "output_path":     "/mnt/output",
                        "output_name":     fmt.Sprintf("diagnostics output %s", timestamp),
diff --git a/lib/diagnostics/hello-world.tar b/lib/diagnostics/hello-world.tar
new file mode 100644 (file)
index 0000000..60b50ea
Binary files /dev/null and b/lib/diagnostics/hello-world.tar differ
index 27b8d1dc8aedb70c9c5720ace830bc3bbaabd10a..e02c3743e71809a40053983d1e753e238577f2ec 100644 (file)
@@ -157,6 +157,7 @@ func (inst *installCommand) RunCommand(prog string, args []string, stdin io.Read
                        "libattr1-dev",
                        "libcrypt-ssleay-perl",
                        "libfuse-dev",
+                       "libgbm1", // cypress / workbench2 tests
                        "libgnutls28-dev",
                        "libjson-perl",
                        "libpam-dev",
index 36501adf81e60c6b4f9c8f7919f9ee57c5c13581..c362c32b872bec65d7c8f79ad5cb29b8c04f9a8c 100644 (file)
@@ -402,29 +402,6 @@ func (initcmd *initCommand) RunCommand(prog string, args []string, stdin io.Read
                fmt.Fprintln(stderr, "...looks good")
        }
 
-       if out, err := exec.CommandContext(ctx, "docker", "version").CombinedOutput(); err == nil && strings.Contains(string(out), "\nServer:\n") {
-               fmt.Fprintln(stderr, "loading alpine docker image for diagnostics...")
-               cmd := exec.CommandContext(ctx, "docker", "pull", "alpine")
-               cmd.Stdout = stderr
-               cmd.Stderr = stderr
-               err = cmd.Run()
-               if err != nil {
-                       err = fmt.Errorf("%v: %w", cmd.Args, err)
-                       return 1
-               }
-               cmd = exec.CommandContext(ctx, "arv", "sudo", "keep", "docker", "alpine")
-               cmd.Stdout = stderr
-               cmd.Stderr = stderr
-               err = cmd.Run()
-               if err != nil {
-                       err = fmt.Errorf("%v: %w", cmd.Args, err)
-                       return 1
-               }
-               fmt.Fprintln(stderr, "...done")
-       } else {
-               fmt.Fprintln(stderr, "docker is not installed -- skipping step of downloading 'alpine' image")
-       }
-
        fmt.Fprintf(stderr, `
 Setup complete. Next steps:
 * run 'arv sudo diagnostics'
index 0d9324784d503e1fb30789e45e2f65ae7b84fdd1..e2348337e62992eb4463947690e809e1927bb232 100644 (file)
@@ -6,6 +6,8 @@ package lsf
 
 import (
        "context"
+       "crypto/hmac"
+       "crypto/sha256"
        "errors"
        "fmt"
        "math"
@@ -274,7 +276,12 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s
        var crArgs []string
        crArgs = append(crArgs, crunchRunCommand...)
        crArgs = append(crArgs, container.UUID)
-       crScript := execScript(crArgs)
+
+       h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken))
+       fmt.Fprint(h, container.UUID)
+       authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+       crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret})
 
        bsubArgs, err := disp.bsubArgs(container)
        if err != nil {
@@ -353,8 +360,14 @@ func (disp *dispatcher) checkLsfQueueForOrphans() {
        }
 }
 
-func execScript(args []string) []byte {
-       s := "#!/bin/sh\nexec"
+func execScript(args []string, env map[string]string) []byte {
+       s := "#!/bin/sh\n"
+       for k, v := range env {
+               s += k + `='`
+               s += strings.Replace(v, `'`, `'\''`, -1)
+               s += `' `
+       }
+       s += `exec`
        for _, w := range args {
                s += ` '`
                s += strings.Replace(w, `'`, `'\''`, -1)
index f20268d19a525ca0056bceb54ea7a2036b50bb3a..e5aa4e4f2a83c8224372259012355228338be607 100755 (executable)
@@ -172,7 +172,7 @@ def edit_and_commit_object initial_obj, tmp_stem, global_opts, &block
         # Load the new object
         newobj = case global_opts[:format]
                  when 'json'
-                   Oj.load(newcontent)
+                   Oj.safe_load(newcontent)
                  when 'yaml'
                    YAML.load(newcontent)
                  else
index 778af58ac3f7a1b71c040d5ec4f3332ecba11964..8635d5fcfed8490aa9c8e1e79eacc383d8d1f2e0 100644 (file)
@@ -252,6 +252,11 @@ The 'jobs' API is no longer supported.
         Called when there's a need to report errors, warnings or just
         activity statuses, for example in the RuntimeStatusLoggingHandler.
         """
+
+        if kind not in ('error', 'warning'):
+            # Ignore any other status kind
+            return
+
         with self.workflow_eval_lock:
             current = None
             try:
@@ -261,32 +266,35 @@ The 'jobs' API is no longer supported.
             if current is None:
                 return
             runtime_status = current.get('runtime_status', {})
-            if kind in ('error', 'warning'):
-                updatemessage = runtime_status.get(kind, "")
-                if not updatemessage:
-                    updatemessage = message
-
-                # Subsequent messages tacked on in detail
-                updatedetail = runtime_status.get(kind+'Detail', "")
-                maxlines = 40
-                if updatedetail.count("\n") < maxlines:
-                    if updatedetail:
-                        updatedetail += "\n"
-                    updatedetail += message + "\n"
-
-                    if detail:
-                        updatedetail += detail + "\n"
-
-                    if updatedetail.count("\n") >= maxlines:
-                        updatedetail += "\nSome messages may have been omitted.  Check the full log."
-
-                runtime_status.update({
-                    kind: updatemessage,
-                    kind+'Detail': updatedetail,
-                })
-            else:
-                # Ignore any other status kind
+
+            original_updatemessage = updatemessage = runtime_status.get(kind, "")
+            if not updatemessage:
+                updatemessage = message
+
+            # Subsequent messages tacked on in detail
+            original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
+            maxlines = 40
+            if updatedetail.count("\n") < maxlines:
+                if updatedetail:
+                    updatedetail += "\n"
+                updatedetail += message + "\n"
+
+                if detail:
+                    updatedetail += detail + "\n"
+
+                if updatedetail.count("\n") >= maxlines:
+                    updatedetail += "\nSome messages may have been omitted.  Check the full log."
+
+            if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
+                # don't waste time doing an update if nothing changed
+                # (usually because we exceeded the max lines)
                 return
+
+            runtime_status.update({
+                kind: updatemessage,
+                kind+'Detail': updatedetail,
+            })
+
             try:
                 self.api.containers().update(uuid=current['uuid'],
                                             body={
@@ -565,8 +573,9 @@ The 'jobs' API is no longer supported.
         self.project_uuid = runtimeContext.project_uuid
 
         # Upload local file references in the job order.
-        job_order = upload_job_order(self, "%s input" % runtimeContext.name,
-                                     updated_tool, job_order, runtimeContext)
+        with Perf(metrics, "upload_job_order"):
+            job_order = upload_job_order(self, "%s input" % runtimeContext.name,
+                                         updated_tool, job_order, runtimeContext)
 
         # the last clause means: if it is a command line tool, and we
         # are going to wait for the result, and always_submit_runner
@@ -581,19 +590,23 @@ The 'jobs' API is no longer supported.
 
         loadingContext = self.loadingContext.copy()
         loadingContext.do_validate = False
+        loadingContext.disable_js_validation = True
         if submitting:
             loadingContext.do_update = False
             # Document may have been auto-updated. Reload the original
             # document with updating disabled because we want to
             # submit the document with its original CWL version, not
             # the auto-updated one.
-            tool = load_tool(updated_tool.tool["id"], loadingContext)
+            with Perf(metrics, "load_tool original"):
+                tool = load_tool(updated_tool.tool["id"], loadingContext)
         else:
             tool = updated_tool
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
-        merged_map = upload_workflow_deps(self, tool, runtimeContext)
+        logger.info("Uploading workflow dependencies")
+        with Perf(metrics, "upload_workflow_deps"):
+            merged_map = upload_workflow_deps(self, tool, runtimeContext)
 
         # Recreate process object (ArvadosWorkflow or
         # ArvadosCommandTool) because tool document may have been
@@ -602,7 +615,8 @@ The 'jobs' API is no longer supported.
         loadingContext.loader = tool.doc_loader
         loadingContext.avsc_names = tool.doc_schema
         loadingContext.metadata = tool.metadata
-        tool = load_tool(tool.tool, loadingContext)
+        with Perf(metrics, "load_tool"):
+            tool = load_tool(tool.tool, loadingContext)
 
         if runtimeContext.update_workflow or runtimeContext.create_workflow:
             # Create a pipeline template or workflow record and exit.
index 644713bce25385938df289dbdcb4cf68b77f3ca5..225f4ae60ed9b5cdfe83f9ffa240e8dcf08da5f5 100644 (file)
@@ -17,7 +17,30 @@ import json
 import copy
 from collections import namedtuple
 from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    MutableMapping,
+    Sequence,
+    MutableSequence,
+    Optional,
+    Set,
+    Sized,
+    Tuple,
+    Type,
+    Union,
+    cast,
+)
+from cwltool.utils import (
+    CWLObjectType,
+    CWLOutputAtomType,
+    CWLOutputType,
+)
 
 if os.name == "posix" and sys.version_info[0] < 3:
     import subprocess32 as subprocess
@@ -49,8 +72,10 @@ from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, col
 from ._version import __version__
 from . import done
 from . context import ArvRuntimeContext
+from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
 def trim_anonymous_location(obj):
     """Remove 'location' field from File and Directory literals.
@@ -228,23 +253,33 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
                 if sfname is None:
                     continue
 
-                p_location = primary["location"]
-                if "/" in p_location:
-                    sfpath = (
-                        p_location[0 : p_location.rindex("/") + 1]
-                        + sfname
-                    )
+                if isinstance(sfname, str):
+                    p_location = primary["location"]
+                    if "/" in p_location:
+                        sfpath = (
+                            p_location[0 : p_location.rindex("/") + 1]
+                            + sfname
+                        )
 
             required = builder.do_eval(required, context=primary)
 
-            if fsaccess.exists(sfpath):
-                if pattern is not None:
-                    found.append({"location": sfpath, "class": "File"})
-                else:
-                    found.append(sf)
-            elif required:
-                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
-                    "Required secondary file '%s' does not exist" % sfpath)
+            if isinstance(sfname, list) or isinstance(sfname, dict):
+                each = aslist(sfname)
+                for e in each:
+                    if required and not fsaccess.exists(e.get("location")):
+                        raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                            "Required secondary file '%s' does not exist" % e.get("location"))
+                found.extend(each)
+
+            if isinstance(sfname, str):
+                if fsaccess.exists(sfpath):
+                    if pattern is not None:
+                        found.append({"location": sfpath, "class": "File"})
+                    else:
+                        found.append(sf)
+                elif required:
+                    raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                        "Required secondary file '%s' does not exist" % sfpath)
 
         primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
@@ -260,7 +295,8 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
 
 def upload_dependencies(arvrunner, name, document_loader,
                         workflowobj, uri, loadref_run, runtimeContext,
-                        include_primary=True, discovered_secondaryfiles=None):
+                        include_primary=True, discovered_secondaryfiles=None,
+                        cache=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
     Returns a pathmapper object mapping local paths to keep references.  Also
@@ -279,6 +315,8 @@ def upload_dependencies(arvrunner, name, document_loader,
         defrg, _ = urllib.parse.urldefrag(joined)
         if defrg not in loaded:
             loaded.add(defrg)
+            if cache is not None and defrg in cache:
+                return cache[defrg]
             # Use fetch_text to get raw file (before preprocessing).
             text = document_loader.fetch_text(defrg)
             if isinstance(text, bytes):
@@ -286,7 +324,10 @@ def upload_dependencies(arvrunner, name, document_loader,
             else:
                 textIO = StringIO(text)
             yamlloader = YAML(typ='safe', pure=True)
-            return yamlloader.load(textIO)
+            result = yamlloader.load(textIO)
+            if cache is not None:
+                cache[defrg] = result
+            return result
         else:
             return {}
 
@@ -297,25 +338,37 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     scanobj = workflowobj
     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
-        # Need raw file content (before preprocessing) to ensure
-        # that external references in $include and $mixin are captured.
-        scanobj = loadref("", workflowobj["id"])
+        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+        if cache is not None and defrg not in cache:
+            # if we haven't seen this file before, want raw file
+            # content (before preprocessing) to ensure that external
+            # references like $include haven't already been inlined.
+            scanobj = loadref("", workflowobj["id"])
 
     metadata = scanobj
 
-    sc_result = scandeps(uri, scanobj,
-                         loadref_fields,
-                         set(("$include", "location")),
-                         loadref, urljoin=document_loader.fetcher.urljoin,
-                         nestdirs=False)
+    with Perf(metrics, "scandeps include, location"):
+        sc_result = scandeps(uri, scanobj,
+                             loadref_fields,
+                             set(("$include", "location")),
+                             loadref, urljoin=document_loader.fetcher.urljoin,
+                             nestdirs=False)
 
-    optional_deps = scandeps(uri, scanobj,
-                                  loadref_fields,
-                                  set(("$schemas",)),
-                                  loadref, urljoin=document_loader.fetcher.urljoin,
-                                  nestdirs=False)
+    with Perf(metrics, "scandeps $schemas"):
+        optional_deps = scandeps(uri, scanobj,
+                                      loadref_fields,
+                                      set(("$schemas",)),
+                                      loadref, urljoin=document_loader.fetcher.urljoin,
+                                      nestdirs=False)
 
-    sc_result.extend(optional_deps)
+    if sc_result is None:
+        sc_result = []
+
+    if optional_deps is None:
+        optional_deps = []
+
+    if optional_deps:
+        sc_result.extend(optional_deps)
 
     sc = []
     uuids = {}
@@ -343,35 +396,45 @@ def upload_dependencies(arvrunner, name, document_loader,
             sc.append(obj)
         collect_uuids(obj)
 
-    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
-    visit_class(sc_result, ("File", "Directory"), collect_uploads)
+    with Perf(metrics, "collect uuids"):
+        visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+
+    with Perf(metrics, "collect uploads"):
+        visit_class(sc_result, ("File", "Directory"), collect_uploads)
 
     # Resolve any collection uuids we found to portable data hashes
     # and assign them to uuid_map
     uuid_map = {}
     fetch_uuids = list(uuids.keys())
-    while fetch_uuids:
-        # For a large number of fetch_uuids, API server may limit
-        # response size, so keep fetching from API server has nothing
-        # more to give us.
-        lookups = arvrunner.api.collections().list(
-            filters=[["uuid", "in", fetch_uuids]],
-            count="none",
-            select=["uuid", "portable_data_hash"]).execute(
-                num_retries=arvrunner.num_retries)
+    with Perf(metrics, "fetch_uuids"):
+        while fetch_uuids:
+            # For a large number of fetch_uuids, API server may limit
+            # response size, so keep fetching from API server has nothing
+            # more to give us.
+            lookups = arvrunner.api.collections().list(
+                filters=[["uuid", "in", fetch_uuids]],
+                count="none",
+                select=["uuid", "portable_data_hash"]).execute(
+                    num_retries=arvrunner.num_retries)
 
-        if not lookups["items"]:
-            break
+            if not lookups["items"]:
+                break
 
-        for l in lookups["items"]:
-            uuid_map[l["uuid"]] = l["portable_data_hash"]
+            for l in lookups["items"]:
+                uuid_map[l["uuid"]] = l["portable_data_hash"]
 
-        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
+            fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
-    if include_primary and "id" in workflowobj:
-        sc.append({"class": "File", "location": workflowobj["id"]})
+    if "id" in workflowobj:
+        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+        if include_primary:
+            # make sure it's included
+            sc.append({"class": "File", "location": defrg})
+        else:
+            # make sure it's excluded
+            sc = [d for d in sc if d.get("location") != defrg]
 
     def visit_default(obj):
         def defaults_are_optional(f):
@@ -412,12 +475,13 @@ def upload_dependencies(arvrunner, name, document_loader,
         else:
             del discovered[d]
 
-    mapper = ArvPathMapper(arvrunner, sc, "",
-                           "keep:%s",
-                           "keep:%s/%s",
-                           name=name,
-                           single_collection=True,
-                           optional_deps=optional_deps)
+    with Perf(metrics, "mapper"):
+        mapper = ArvPathMapper(arvrunner, sc, "",
+                               "keep:%s",
+                               "keep:%s/%s",
+                               name=name,
+                               single_collection=True,
+                               optional_deps=optional_deps)
 
     keeprefs = set()
     def addkeepref(k):
@@ -461,8 +525,9 @@ def upload_dependencies(arvrunner, name, document_loader,
         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
         p[collectionUUID] = uuid
 
-    visit_class(workflowobj, ("File", "Directory"), setloc)
-    visit_class(discovered, ("File", "Directory"), setloc)
+    with Perf(metrics, "setloc"):
+        visit_class(workflowobj, ("File", "Directory"), setloc)
+        visit_class(discovered, ("File", "Directory"), setloc)
 
     if discovered_secondaryfiles is not None:
         for d in discovered:
@@ -647,24 +712,27 @@ FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
-    upload_docker(arvrunner, tool, runtimeContext)
+    with Perf(metrics, "upload_docker"):
+        upload_docker(arvrunner, tool, runtimeContext)
 
     document_loader = tool.doc_loader
 
     merged_map = {}
-
+    tool_dep_cache = {}
     def upload_tool_deps(deptool):
         if "id" in deptool:
             discovered_secondaryfiles = {}
-            pm = upload_dependencies(arvrunner,
-                                     "%s dependencies" % (shortname(deptool["id"])),
-                                     document_loader,
-                                     deptool,
-                                     deptool["id"],
-                                     False,
-                                     runtimeContext,
-                                     include_primary=False,
-                                     discovered_secondaryfiles=discovered_secondaryfiles)
+            with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+                pm = upload_dependencies(arvrunner,
+                                         "%s dependencies" % (shortname(deptool["id"])),
+                                         document_loader,
+                                         deptool,
+                                         deptool["id"],
+                                         False,
+                                         runtimeContext,
+                                         include_primary=False,
+                                         discovered_secondaryfiles=discovered_secondaryfiles,
+                                         cache=tool_dep_cache)
             document_loader.idx[deptool["id"]] = deptool
             toolmap = {}
             for k,v in pm.items():
index c885ebd4b1303a1fb9cb02d6b5918719d2c01b16..66cda19f4012fc7754a4286d81dfe746c901cad6 100644 (file)
@@ -36,12 +36,13 @@ setup(name='arvados-cwl-runner',
       # file to determine what version of cwltool and schema-salad to
       # build.
       install_requires=[
-          'cwltool==3.1.20220224085855',
-          'schema-salad==8.2.20211116214159',
+          'cwltool==3.1.20220623174452',
+          'schema-salad==8.3.20220801194920',
           'arvados-python-client{}'.format(pysdk_dep),
           'setuptools',
           'ciso8601 >= 2.0.0',
-          'networkx < 2.6'
+          'networkx < 2.6',
+          'msgpack==1.0.3'
       ],
       data_files=[
           ('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
index d76ece1eddb4560f47f9d7892313e1f9e3882746..3797a17f50d504ae2894ac4c6a68f598b4e37564 100644 (file)
@@ -10,6 +10,7 @@ import (
        "encoding/json"
        "io"
        "net"
+       "net/http"
 
        "github.com/sirupsen/logrus"
 )
@@ -47,7 +48,8 @@ var (
        EndpointContainerDelete               = APIEndpoint{"DELETE", "arvados/v1/containers/{uuid}", ""}
        EndpointContainerLock                 = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/lock", ""}
        EndpointContainerUnlock               = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/unlock", ""}
-       EndpointContainerSSH                  = APIEndpoint{"GET", "arvados/v1/connect/{uuid}/ssh", ""} // move to /containers after #17014 fixes routing
+       EndpointContainerSSH                  = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/ssh", ""}            // move to /containers after #17014 fixes routing
+       EndpointContainerGatewayTunnel        = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/gateway_tunnel", ""} // move to /containers after #17014 fixes routing
        EndpointContainerRequestCreate        = APIEndpoint{"POST", "arvados/v1/container_requests", "container_request"}
        EndpointContainerRequestUpdate        = APIEndpoint{"PATCH", "arvados/v1/container_requests/{uuid}", "container_request"}
        EndpointContainerRequestGet           = APIEndpoint{"GET", "arvados/v1/container_requests/{uuid}", ""}
@@ -96,12 +98,19 @@ type ContainerSSHOptions struct {
        UUID          string `json:"uuid"`
        DetachKeys    string `json:"detach_keys"`
        LoginUsername string `json:"login_username"`
+       NoForward     bool   `json:"no_forward"`
 }
 
-type ContainerSSHConnection struct {
+type ConnectionResponse struct {
        Conn   net.Conn           `json:"-"`
        Bufrw  *bufio.ReadWriter  `json:"-"`
        Logger logrus.FieldLogger `json:"-"`
+       Header http.Header        `json:"-"`
+}
+
+type ContainerGatewayTunnelOptions struct {
+       UUID       string `json:"uuid"`
+       AuthSecret string `json:"auth_secret"`
 }
 
 type GetOptions struct {
@@ -254,7 +263,8 @@ type API interface {
        ContainerDelete(ctx context.Context, options DeleteOptions) (Container, error)
        ContainerLock(ctx context.Context, options GetOptions) (Container, error)
        ContainerUnlock(ctx context.Context, options GetOptions) (Container, error)
-       ContainerSSH(ctx context.Context, options ContainerSSHOptions) (ContainerSSHConnection, error)
+       ContainerSSH(ctx context.Context, options ContainerSSHOptions) (ConnectionResponse, error)
+       ContainerGatewayTunnel(ctx context.Context, options ContainerGatewayTunnelOptions) (ConnectionResponse, error)
        ContainerRequestCreate(ctx context.Context, options CreateOptions) (ContainerRequest, error)
        ContainerRequestUpdate(ctx context.Context, options UpdateOptions) (ContainerRequest, error)
        ContainerRequestGet(ctx context.Context, options GetOptions) (ContainerRequest, error)
index 466221fe19316ef908db578f005dfa20a0722259..45f92017c4d02be4a6d4063439ea8cd515dbd268 100644 (file)
@@ -38,6 +38,8 @@ type Container struct {
        RuntimeToken              string                 `json:"runtime_token"`
        AuthUUID                  string                 `json:"auth_uuid"`
        Log                       string                 `json:"log"`
+       Cost                      float64                `json:"cost"`
+       SubrequestsCost           float64                `json:"subrequests_cost"`
 }
 
 // ContainerRequest is an arvados#container_request resource.
@@ -77,6 +79,7 @@ type ContainerRequest struct {
        ContainerCount          int                    `json:"container_count"`
        OutputStorageClasses    []string               `json:"output_storage_classes"`
        OutputProperties        map[string]interface{} `json:"output_properties"`
+       CumulativeCost          float64                `json:"cumulative_cost"`
 }
 
 // Mount is special behavior to attach to a filesystem path or device.
index 00c98d572ea010e2db43ca1c62bfa0eb341b1e7d..897ae434e14bd0a0392d041a125a598b2c1d8b34 100644 (file)
@@ -14,14 +14,17 @@ import (
        "github.com/sirupsen/logrus"
 )
 
-func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+       defer cresp.Conn.Close()
        hj, ok := w.(http.Hijacker)
        if !ok {
                http.Error(w, "ResponseWriter does not support connection upgrade", http.StatusInternalServerError)
                return
        }
        w.Header().Set("Connection", "upgrade")
-       w.Header().Set("Upgrade", "ssh")
+       for k, v := range cresp.Header {
+               w.Header()[k] = v
+       }
        w.WriteHeader(http.StatusSwitchingProtocols)
        conn, bufrw, err := hj.Hijack()
        if err != nil {
@@ -31,44 +34,46 @@ func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http
        defer conn.Close()
 
        var bytesIn, bytesOut int64
+       ctx, cancel := context.WithCancel(req.Context())
        var wg sync.WaitGroup
-       ctx, cancel := context.WithCancel(context.Background())
        wg.Add(1)
        go func() {
                defer wg.Done()
                defer cancel()
-               n, err := io.CopyN(conn, sshconn.Bufrw, int64(sshconn.Bufrw.Reader.Buffered()))
+               n, err := io.CopyN(conn, cresp.Bufrw, int64(cresp.Bufrw.Reader.Buffered()))
                bytesOut += n
                if err == nil {
-                       n, err = io.Copy(conn, sshconn.Conn)
+                       n, err = io.Copy(conn, cresp.Conn)
                        bytesOut += n
                }
                if err != nil {
-                       ctxlog.FromContext(req.Context()).WithError(err).Error("error copying downstream")
+                       ctxlog.FromContext(ctx).WithError(err).Error("error copying downstream")
                }
        }()
        wg.Add(1)
        go func() {
                defer wg.Done()
                defer cancel()
-               n, err := io.CopyN(sshconn.Conn, bufrw, int64(bufrw.Reader.Buffered()))
+               n, err := io.CopyN(cresp.Conn, bufrw, int64(bufrw.Reader.Buffered()))
                bytesIn += n
                if err == nil {
-                       n, err = io.Copy(sshconn.Conn, conn)
+                       n, err = io.Copy(cresp.Conn, conn)
                        bytesIn += n
                }
                if err != nil {
-                       ctxlog.FromContext(req.Context()).WithError(err).Error("error copying upstream")
+                       ctxlog.FromContext(ctx).WithError(err).Error("error copying upstream")
                }
        }()
        <-ctx.Done()
-       if sshconn.Logger != nil {
-               go func() {
-                       wg.Wait()
-                       sshconn.Logger.WithFields(logrus.Fields{
+       go func() {
+               // Wait for both io.Copy goroutines to finish and increment
+               // their byte counters.
+               wg.Wait()
+               if cresp.Logger != nil {
+                       cresp.Logger.WithFields(logrus.Fields{
                                "bytesIn":  bytesIn,
                                "bytesOut": bytesOut,
                        }).Info("closed connection")
-               }()
-       }
+               }
+       }()
 }
index f49d29ce2b8cbbc7b9a8c564e20d86673588f58a..d6da579d6b9ce1323dfbeb9b50f993232822379a 100644 (file)
@@ -109,9 +109,13 @@ func (as *APIStub) ContainerUnlock(ctx context.Context, options arvados.GetOptio
        as.appendCall(ctx, as.ContainerUnlock, options)
        return arvados.Container{}, as.Error
 }
-func (as *APIStub) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ContainerSSHConnection, error) {
+func (as *APIStub) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ConnectionResponse, error) {
        as.appendCall(ctx, as.ContainerSSH, options)
-       return arvados.ContainerSSHConnection{}, as.Error
+       return arvados.ConnectionResponse{}, as.Error
+}
+func (as *APIStub) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) {
+       as.appendCall(ctx, as.ContainerGatewayTunnel, options)
+       return arvados.ConnectionResponse{}, as.Error
 }
 func (as *APIStub) ContainerRequestCreate(ctx context.Context, options arvados.CreateOptions) (arvados.ContainerRequest, error) {
        as.appendCall(ctx, as.ContainerRequestCreate, options)
index a44d42b6ac7cd7a4156ab3b8bc4f72f86060e3a0..998481ab661105b68b0247d1a82c09211fa0d66e 100644 (file)
@@ -827,7 +827,7 @@ class RichCollectionBase(CollectionBase):
             self.set_committed(False)
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
-            item.remove(pathcomponents[1])
+            item.remove(pathcomponents[1], recursive=recursive)
 
     def _clonefrom(self, source):
         for k,v in listitems(source):
index 28cb0953f3c42a348a623a4f3f54aadc27d7958c..e32d385f73fc0849c30d4a730cb3b83c6a4425c6 100644 (file)
@@ -831,6 +831,7 @@ def setup_config():
                     "JobsAPI": {
                         "GitInternalDir": os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'internal.git'),
                     },
+                    "LocalKeepBlobBuffersPerVCPU": 0,
                     "SupportedDockerImageFormats": {"v1": {}},
                     "ShellAccess": {
                         "Admin": True,
index 5cf4993b2f3804d22209ae16db41fc7bc505efd8..b4849c21ff30909ab3c7f9bb0af8040582e82a58 100644 (file)
@@ -969,6 +969,20 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         with self.assertRaises(arvados.errors.ArgumentError):
             c.remove("")
 
+    def test_remove_recursive(self):
+        c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:a/b/c/d/efg.txt 0:10:xyz.txt\n')
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:xyz.txt\n./a/b/c/d 781e5e245d69b566979b86e28d23f2c7+10 0:10:efg.txt\n", c.portable_manifest_text())
+        self.assertIn("a", c)
+        self.assertEqual(1, len(c["a"].keys()))
+        # cannot remove non-empty directory with default recursive=False
+        with self.assertRaises(OSError):
+            c.remove("a/b")
+        with self.assertRaises(OSError):
+            c.remove("a/b/c/d")
+        c.remove("a/b", recursive=True)
+        self.assertEqual(0, len(c["a"].keys()))
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:xyz.txt\n./a d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", c.portable_manifest_text())
+
     def test_find(self):
         c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n')
         self.assertIs(c.find("."), c)
index 3473c7e4e0c361e3c594569f83178e2ea18ebed2..e9bc006a36664bb1a929bf72dccc9730fa9b049c 100644 (file)
@@ -263,6 +263,9 @@ class Arvados::V1::GroupsController < ApplicationController
     included_by_uuid = {}
 
     seen_last_class = false
+    error_by_class = {}
+    any_success = false
+
     klasses.each do |klass|
       # check if current klass is same as params['last_object_class']
       seen_last_class = true if((params['count'].andand.==('none')) and
@@ -318,7 +321,19 @@ class Arvados::V1::GroupsController < ApplicationController
       # Adjust the limit based on number of objects fetched so far
       klass_limit = limit_all - all_objects.count
       @limit = klass_limit
-      apply_where_limit_order_params klass
+
+      begin
+        apply_where_limit_order_params klass
+      rescue ArgumentError => e
+        if e.inspect =~ /Invalid attribute '.+' for operator '.+' in filter/ or
+          e.inspect =~ /Invalid attribute '.+' for subproperty filter/
+          error_by_class[klass.name] = e
+          next
+        end
+        raise
+      else
+        any_success = true
+      end
 
       # This actually fetches the objects
       klass_object_list = object_list(model_class: klass)
@@ -349,6 +364,14 @@ class Arvados::V1::GroupsController < ApplicationController
       end
     end
 
+    # Only error out when every searchable object type errored out
+    if !any_success && error_by_class.size > 0
+      error_msg = error_by_class.collect do |klass, err|
+        "#{err} on object type #{klass}"
+      end.join("\n")
+      raise ArgumentError.new(error_msg)
+    end
+
     if params["include"]
       @extra_included = included_by_uuid.values
     end
index 08f87bbdb13b3a4ae21ce4d26b694ecc2dd57cef..3e3f73b838dab5f4809bef12cd8c3d3dc1b02b08 100644 (file)
@@ -83,6 +83,8 @@ class Container < ArvadosModel
     t.add :interactive_session_started
     t.add :output_storage_classes
     t.add :output_properties
+    t.add :cost
+    t.add :subrequests_cost
   end
 
   # Supported states for a container
@@ -478,8 +480,9 @@ class Container < ArvadosModel
 
   def validate_change
     permitted = [:state]
-    progress_attrs = [:progress, :runtime_status, :log, :output, :output_properties, :exit_code]
     final_attrs = [:finished_at]
+    progress_attrs = [:progress, :runtime_status, :subrequests_cost, :cost,
+                      :log, :output, :output_properties, :exit_code]
 
     if self.new_record?
       permitted.push(:owner_uuid, :command, :container_image, :cwd,
@@ -498,9 +501,9 @@ class Container < ArvadosModel
       permitted.push :priority
 
     when Running
-      permitted.push :priority, :output_properties, *progress_attrs
+      permitted.push :priority, :output_properties, :gateway_address, *progress_attrs
       if self.state_changed?
-        permitted.push :started_at, :gateway_address
+        permitted.push :started_at
       end
       if !self.interactive_session_started_was
         permitted.push :interactive_session_started
@@ -516,7 +519,7 @@ class Container < ArvadosModel
       when Running
         permitted.push :finished_at, *progress_attrs
       when Queued, Locked
-        permitted.push :finished_at, :log, :runtime_status
+        permitted.push :finished_at, :log, :runtime_status, :cost
       end
 
     else
@@ -719,6 +722,7 @@ class Container < ArvadosModel
               cr.with_lock do
                 leave_modified_by_user_alone do
                   # Use row locking because this increments container_count
+                  cr.cumulative_cost += self.cost + self.subrequests_cost
                   cr.container_uuid = c.uuid
                   cr.save!
                 end
index 911603590586a6e1cbaddb8a2f575940ff0d8cd3..47e2769a8c187ccb642a0514b092816f041d251d 100644 (file)
@@ -33,6 +33,7 @@ class ContainerRequest < ArvadosModel
   serialize :scheduling_parameters, Hash
 
   after_find :fill_container_defaults_after_find
+  after_initialize { @state_was_when_initialized = self.state_was } # see finalize_if_needed
   before_validation :fill_field_defaults, :if => :new_record?
   before_validation :fill_container_defaults
   validates :command, :container_image, :output_path, :cwd, :presence => true
@@ -80,6 +81,7 @@ class ContainerRequest < ArvadosModel
     t.add :use_existing
     t.add :output_storage_classes
     t.add :output_properties
+    t.add :cumulative_cost
   end
 
   # Supported states for a container request
@@ -173,6 +175,30 @@ class ContainerRequest < ArvadosModel
   def finalize!
     container = Container.find_by_uuid(container_uuid)
     if !container.nil?
+      # We don't want to add the container cost if the container was
+      # already finished when this CR was committed. But we are
+      # running in an after_save hook after a lock/reload, so
+      # state_was has already been updated to Committed regardless.
+      # Hence the need for @state_was_when_initialized.
+      if @state_was_when_initialized == Committed
+        # Add the final container cost to our cumulative cost (which
+        # may already be non-zero from previous attempts if
+        # container_count_max > 1).
+        self.cumulative_cost += container.cost + container.subrequests_cost
+      end
+
+      # Add our cumulative cost to the subrequests_cost of the
+      # requesting container, if any.
+      if self.requesting_container_uuid
+        Container.where(
+          uuid: self.requesting_container_uuid,
+          state: Container::Running,
+        ).each do |c|
+          c.subrequests_cost += self.cumulative_cost
+          c.save!
+        end
+      end
+
       update_collections(container: container)
 
       if container.state == Container::Complete
@@ -461,7 +487,7 @@ class ContainerRequest < ArvadosModel
 
     case self.state
     when Committed
-      permitted.push :priority, :container_count_max, :container_uuid
+      permitted.push :priority, :container_count_max, :container_uuid, :cumulative_cost
 
       if self.priority.nil?
         self.errors.add :priority, "cannot be nil"
@@ -478,7 +504,7 @@ class ContainerRequest < ArvadosModel
     when Final
       if self.state_was == Committed
         # "Cancel" means setting priority=0, state=Committed
-        permitted.push :priority
+        permitted.push :priority, :cumulative_cost
 
         if current_user.andand.is_admin
           permitted.push :output_uuid, :log_uuid
diff --git a/services/api/db/migrate/20220804133317_add_cost_to_containers.rb b/services/api/db/migrate/20220804133317_add_cost_to_containers.rb
new file mode 100644 (file)
index 0000000..188187e
--- /dev/null
@@ -0,0 +1,11 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class AddCostToContainers < ActiveRecord::Migration[5.2]
+  def change
+    add_column :containers, :cost, :float, null: false, default: 0
+    add_column :containers, :subrequests_cost, :float, null: false, default: 0
+    add_column :container_requests, :cumulative_cost, :float, null: false, default: 0
+  end
+end
index 525300833e7bf55069687eee5fe14a0ad3b6dd4f..825d5c72b0f0d4ef694f65dd61c41f31cbcd3a88 100644 (file)
@@ -479,7 +479,8 @@ CREATE TABLE public.container_requests (
     secret_mounts jsonb DEFAULT '{}'::jsonb,
     runtime_token text,
     output_storage_classes jsonb DEFAULT '["default"]'::jsonb,
-    output_properties jsonb DEFAULT '{}'::jsonb
+    output_properties jsonb DEFAULT '{}'::jsonb,
+    cumulative_cost double precision DEFAULT 0.0 NOT NULL
 );
 
 
@@ -543,7 +544,9 @@ CREATE TABLE public.containers (
     gateway_address character varying,
     interactive_session_started boolean DEFAULT false NOT NULL,
     output_storage_classes jsonb DEFAULT '["default"]'::jsonb,
-    output_properties jsonb DEFAULT '{}'::jsonb
+    output_properties jsonb DEFAULT '{}'::jsonb,
+    cost double precision DEFAULT 0.0 NOT NULL,
+    subrequests_cost double precision DEFAULT 0.0 NOT NULL
 );
 
 
@@ -3182,5 +3185,5 @@ INSERT INTO "schema_migrations" (version) VALUES
 ('20220401153101'),
 ('20220505112900'),
 ('20220726034131');
-
+('20220804133317');
 
index 2f5b67074a9bdf5b24d3689333d17ee6e98e0745..65c25810acf2e3ef98422a1de3a5c8503e75edfe 100644 (file)
@@ -136,7 +136,7 @@ module RecordFilters
             raise ArgumentError.new("Invalid operator for subproperty search '#{operator}'")
           end
         elsif operator == "exists"
-          if col.type != :jsonb
+          if col.nil? or col.type != :jsonb
             raise ArgumentError.new("Invalid attribute '#{attr}' for operator '#{operator}' in filter")
           end
 
index dd8eeaa7bead1e260d46e5da4142707792edd42a..3916d63c5ed1cce10cca11182b23682db512d8d1 100644 (file)
@@ -236,6 +236,58 @@ class Arvados::V1::FiltersTest < ActionController::TestCase
                  json_response['errors'].join(' '))
   end
 
+  test "groups contents with properties filter succeeds on objects with properties field" do
+    @controller = Arvados::V1::GroupsController.new
+    authorize_with :admin
+    get :contents, params: {
+      filters: [
+        ['properties', 'exists', 'foo'],
+        ['uuid', 'is_a', ["arvados#group","arvados#collection","arvados#containerRequest"]],
+      ]
+    }
+    assert_response 200
+    assert json_response['items'].length == 0
+  end
+
+  # Tests bug #19297
+  test "groups contents with properties filter succeeds on some objects with properties field" do
+    @controller = Arvados::V1::GroupsController.new
+    authorize_with :admin
+    get :contents, params: {
+      filters: [
+        ['properties', 'exists', 'foo'],
+        ['uuid', 'is_a', ["arvados#group","arvados#workflow"]],
+      ]
+    }
+    assert_response 200
+    assert json_response['items'].length == 0
+  end
+
+  # Tests bug #19297
+  test "groups contents with properties filter fails on objects without properties field" do
+    @controller = Arvados::V1::GroupsController.new
+    authorize_with :admin
+    get :contents, params: {
+      filters: [
+        ['properties', 'exists', 'foo'],
+        ['uuid', 'is_a', ["arvados#workflow"]],
+      ]
+    }
+    assert_response 422
+    assert_match(/Invalid attribute 'properties' for operator 'exists'.*on object type Workflow/, json_response['errors'].join(' '))
+  end
+
+  test "groups contents without filters and limit=0, count=none" do
+    @controller = Arvados::V1::GroupsController.new
+    authorize_with :admin
+    get :contents, params: {
+      limit: 0,
+      count: 'none',
+    }
+    assert_response 200
+    assert json_response['items'].length == 0
+  end
+
   test "replication_desired = 2" do
     @controller = Arvados::V1::CollectionsController.new
     authorize_with :admin
index e5c0085184ec5b0f4690b11decfadd1fb82be5b3..e6db412179b663e39696e9c884af584af287ddbf 100644 (file)
@@ -231,11 +231,12 @@ class ContainerRequestTest < ActiveSupport::TestCase
 
     act_as_system_user do
       Container.find_by_uuid(cr.container_uuid).
-        update_attributes!(state: Container::Cancelled)
+        update_attributes!(state: Container::Cancelled, cost: 1.25)
     end
 
     cr.reload
     assert_equal "Final", cr.state
+    assert_equal 1.25, cr.cumulative_cost
     assert_equal users(:active).uuid, cr.modified_by_user_uuid
   end
 
@@ -261,12 +262,14 @@ class ContainerRequestTest < ActiveSupport::TestCase
     log_pdh = 'fa7aeb5140e2848d39b416daeef4ffc5+45'
     act_as_system_user do
       c.update_attributes!(state: Container::Complete,
+                           cost: 1.25,
                            output: output_pdh,
                            log: log_pdh)
     end
 
     cr.reload
     assert_equal "Final", cr.state
+    assert_equal 1.25, cr.cumulative_cost
     assert_equal users(:active).uuid, cr.modified_by_user_uuid
 
     assert_not_nil cr.output_uuid
@@ -788,6 +791,7 @@ class ContainerRequestTest < ActiveSupport::TestCase
     prev_container_uuid = cr.container_uuid
 
     act_as_system_user do
+      c.update_attributes!(cost: 0.5, subrequests_cost: 1.25)
       c.update_attributes!(state: Container::Cancelled)
     end
 
@@ -800,6 +804,9 @@ class ContainerRequestTest < ActiveSupport::TestCase
 
     c = act_as_system_user do
       c = Container.find_by_uuid(cr.container_uuid)
+      c.update_attributes!(state: Container::Locked)
+      c.update_attributes!(state: Container::Running)
+      c.update_attributes!(cost: 0.125)
       c.update_attributes!(state: Container::Cancelled)
       c
     end
@@ -809,6 +816,7 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal "Final", cr.state
     assert_equal prev_container_uuid, cr.container_uuid
     assert_not_equal cr2.container_uuid, cr.container_uuid
+    assert_equal 1.875, cr.cumulative_cost
   end
 
   test "Retry on container cancelled with runtime_token" do
@@ -1511,4 +1519,63 @@ class ContainerRequestTest < ActiveSupport::TestCase
     end
   end
 
+  test "Cumulative cost includes retried attempts but not reused containers" do
+    set_user_from_auth :active
+    cr = create_minimal_req!(priority: 5, state: "Committed", container_count_max: 3)
+    c = Container.find_by_uuid cr.container_uuid
+    act_as_system_user do
+      c.update_attributes!(state: Container::Locked)
+      c.update_attributes!(state: Container::Running)
+      c.update_attributes!(state: Container::Cancelled, cost: 3)
+    end
+    cr.reload
+    assert_equal 3, cr.cumulative_cost
+
+    c = Container.find_by_uuid cr.container_uuid
+    lock_and_run c
+    c.reload
+    assert_equal 0, c.subrequests_cost
+
+    # cr2 is a child/subrequest
+    cr2 = with_container_auth(c) do
+      create_minimal_req!(priority: 10, state: "Committed", container_count_max: 1, command: ["echo", "foo2"])
+    end
+    assert_equal c.uuid, cr2.requesting_container_uuid
+    c2 = Container.find_by_uuid cr2.container_uuid
+    act_as_system_user do
+      c2.update_attributes!(state: Container::Locked)
+      c2.update_attributes!(state: Container::Running)
+      logc = Collection.new(owner_uuid: system_user_uuid,
+                            manifest_text: ". ef772b2f28e2c8ca84de45466ed19ee9+7815 0:0:arv-mount.txt\n")
+      logc.save!
+      c2.update_attributes!(state: Container::Complete,
+                            exit_code: 0,
+                            output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
+                            log: logc.portable_data_hash,
+                            cost: 7)
+    end
+    c.reload
+    assert_equal 7, c.subrequests_cost
+
+    # cr3 is an identical child/subrequest, will reuse c2
+    cr3 = with_container_auth(c) do
+      create_minimal_req!(priority: 10, state: "Committed", container_count_max: 1, command: ["echo", "foo2"])
+    end
+    assert_equal c.uuid, cr3.requesting_container_uuid
+    c3 = Container.find_by_uuid cr3.container_uuid
+    assert_equal c2.uuid, c3.uuid
+    assert_equal Container::Complete, c3.state
+    c.reload
+    assert_equal 7, c.subrequests_cost
+
+    act_as_system_user do
+      c.update_attributes!(state: Container::Complete, exit_code: 0, cost: 9)
+    end
+
+    c.reload
+    assert_equal 7, c.subrequests_cost
+    cr.reload
+    assert_equal 3+7+9, cr.cumulative_cost
+  end
+
 end
index bcf99da2e3442ba088d9effbe80a633e6467f857..a4c0ce17926092ec451583404a21f56374f79176 100644 (file)
@@ -958,6 +958,7 @@ class ContainerTest < ActiveSupport::TestCase
         Thread.current[:user] = auth.user
       end
 
+      assert c.update_attributes(gateway_address: "127.0.0.1:9")
       assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
       assert c.update_attributes(runtime_status: {'warning' => 'something happened'})
       assert c.update_attributes(progress: 0.5)
index c31d7997522fa1caa73507a009d680b3835a2f46..c774584d683c338e70629320d9d602b9fea30814 100644 (file)
@@ -7,6 +7,8 @@ package dispatchslurm
 
 import (
        "context"
+       "crypto/hmac"
+       "crypto/sha256"
        "fmt"
        "log"
        "math"
@@ -213,7 +215,12 @@ func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []s
        crArgs := append([]string(nil), crunchRunCommand...)
        crArgs = append(crArgs, "--runtime-engine="+disp.cluster.Containers.RuntimeEngine)
        crArgs = append(crArgs, container.UUID)
-       crScript := strings.NewReader(execScript(crArgs))
+
+       h := hmac.New(sha256.New, []byte(disp.cluster.SystemRootToken))
+       fmt.Fprint(h, container.UUID)
+       authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+       crScript := strings.NewReader(execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret}))
 
        sbArgs, err := disp.sbatchArgs(container)
        if err != nil {
index fb16e593e5c5648720452fd49194910a4b2021b5..d0bfbc4a929dd8067a8e3e3e519b17dc0777f475 100644 (file)
@@ -8,8 +8,14 @@ import (
        "strings"
 )
 
-func execScript(args []string) string {
-       s := "#!/bin/sh\nexec"
+func execScript(args []string, env map[string]string) string {
+       s := "#!/bin/sh\n"
+       for k, v := range env {
+               s += k + `='`
+               s += strings.Replace(v, `'`, `'\''`, -1)
+               s += `' `
+       }
+       s += `exec`
        for _, w := range args {
                s += ` '`
                s += strings.Replace(w, `'`, `'\''`, -1)
index 00d70190dd043416302c38fc526dc551dc08f687..bba9a05755cb36ec9848d11b17bc0187657e87d3 100644 (file)
@@ -23,6 +23,7 @@ func (s *ScriptSuite) TestExecScript(c *C) {
                {[]string{`foo"`, "'waz 'qux\n"}, `exec 'foo"' ''\''waz '\''qux` + "\n" + `'`},
        } {
                c.Logf("%+v -> %+v", test.args, test.script)
-               c.Check(execScript(test.args), Equals, "#!/bin/sh\n"+test.script+"\n")
+               c.Check(execScript(test.args, nil), Equals, "#!/bin/sh\n"+test.script+"\n")
        }
+       c.Check(execScript([]string{"sh", "-c", "echo $foo"}, map[string]string{"foo": "b'ar"}), Equals, "#!/bin/sh\nfoo='b'\\''ar' exec 'sh' '-c' 'echo $foo'\n")
 }
index 07c9952f906d7e57dfca21d0007c01d96f42294b..63a994096bc8c4e1c6f974592c5ee4cf2b6f364f 100644 (file)
@@ -151,7 +151,11 @@ func (bsm *BlockStateMap) GetConfirmedReplication(blkids []arvados.SizedDigest,
                for _, c := range classes {
                        perclass[c] = 0
                }
-               for _, r := range bsm.get(blkid).Replicas {
+               bs, ok := bsm.entries[blkid]
+               if !ok {
+                       return 0
+               }
+               for _, r := range bs.Replicas {
                        total += r.KeepMount.Replication
                        mntclasses := r.KeepMount.StorageClasses
                        if len(mntclasses) == 0 {
index 8a58be288ff1832a3799224510d04d5d581d2455..c6076bbd3d526c144849cd8890832e08c7df3b65 100644 (file)
@@ -5,6 +5,7 @@
 package keepbalance
 
 import (
+       "sync"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -92,3 +93,25 @@ func (s *confirmedReplicationSuite) TestBlocksOnMultipleMounts(c *check.C) {
        n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, nil)
        c.Check(n, check.Equals, 4)
 }
+
+func (s *confirmedReplicationSuite) TestConcurrency(c *check.C) {
+       var wg sync.WaitGroup
+       for i := 1000; i < 1256; i++ {
+               i := i
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(i), knownBlkid(i)}, []string{"default"})
+                       c.Check(n, check.Equals, 0)
+               }()
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10)}, []string{"default"})
+                       c.Check(n, check.Equals, 1)
+                       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20)}, []string{"default"})
+                       c.Check(n, check.Equals, 2)
+               }()
+       }
+       wg.Wait()
+}
index ccb01bdd10c5fb7f777d868a2aeefa175d494ace..d7a3fd981d4f03877e57b0426e739941411cf4a3 100644 (file)
@@ -152,7 +152,7 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
        // Use about 1 goroutine per 2 CPUs. Based on experiments with
        // a 2-core host, using more concurrent database
        // calls/transactions makes this process slower, not faster.
-       for i := 0; i < runtime.NumCPU()+1/2; i++ {
+       for i := 0; i < (runtime.NumCPU()+1)/2; i++ {
                wg.Add(1)
                goSendErr(errs, func() error {
                        defer wg.Done()
index 1f1f509860bb9950d95e5d9c566e9e57f9d4df36..3a1d9acde7e7d33208958e467931aef2b1474853 100644 (file)
@@ -909,17 +909,18 @@ func (h *handler) logUploadOrDownload(
                collection, filepath = h.determineCollection(fs, filepath)
        }
        if collection != nil {
-               log = log.WithField("collection_uuid", collection.UUID).
-                       WithField("collection_file_path", filepath)
-               props["collection_uuid"] = collection.UUID
+               log = log.WithField("collection_file_path", filepath)
                props["collection_file_path"] = filepath
-               // h.determineCollection populates the collection_uuid prop with the PDH, if
-               // this collection is being accessed via PDH. In that case, blank the
-               // collection_uuid field so that consumers of the log entries can rely on it
-               // being a UUID, or blank. The PDH remains available via the
-               // portable_data_hash property.
-               if props["collection_uuid"] == collection.PortableDataHash {
-                       props["collection_uuid"] = ""
+               // h.determineCollection populates the collection_uuid
+               // prop with the PDH, if this collection is being
+               // accessed via PDH. For logging, we use a different
+               // field depending on whether it's a UUID or PDH.
+               if len(collection.UUID) > 32 {
+                       log = log.WithField("portable_data_hash", collection.UUID)
+                       props["portable_data_hash"] = collection.UUID
+               } else {
+                       log = log.WithField("collection_uuid", collection.UUID)
+                       props["collection_uuid"] = collection.UUID
                }
        }
        if r.Method == "PUT" || r.Method == "POST" {
@@ -958,29 +959,27 @@ func (h *handler) logUploadOrDownload(
 }
 
 func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string) (*arvados.Collection, string) {
-       segments := strings.Split(path, "/")
-       var i int
-       for i = 0; i < len(segments); i++ {
-               dir := append([]string{}, segments[0:i]...)
-               dir = append(dir, ".arvados#collection")
-               f, err := fs.OpenFile(strings.Join(dir, "/"), os.O_RDONLY, 0)
-               if f != nil {
-                       defer f.Close()
-               }
+       target := strings.TrimSuffix(path, "/")
+       for {
+               fi, err := fs.Stat(target)
                if err != nil {
-                       if !os.IsNotExist(err) {
+                       return nil, ""
+               }
+               switch src := fi.Sys().(type) {
+               case *arvados.Collection:
+                       return src, strings.TrimPrefix(path[len(target):], "/")
+               case *arvados.Group:
+                       return nil, ""
+               default:
+                       if _, ok := src.(error); ok {
                                return nil, ""
                        }
-                       continue
                }
-               // err is nil so we found it.
-               decoder := json.NewDecoder(f)
-               var collection arvados.Collection
-               err = decoder.Decode(&collection)
-               if err != nil {
+               // Try parent
+               cut := strings.LastIndexByte(target, '/')
+               if cut < 0 {
                        return nil, ""
                }
-               return &collection, strings.Join(segments[i:], "/")
+               target = target[:cut]
        }
-       return nil, ""
 }
index 20f334166e419ee806b608ac37fd3a27b10dca82..f072fedb40757f20af87ae1e21da86503960aad9 100644 (file)
@@ -19,7 +19,7 @@ DEPLOY_USER=root
 # installer.sh will log in to each of these nodes and then provision
 # it for the specified roles.
 NODES=(
-  [localhost]=api,controller,websocket,dispatcher,keepbalance,keepstore,keepproxy,keepweb,workbench,workbench2,webshell
+  [localhost]=api,controller,websocket,dispatcher,keepbalance,keepstore,keepproxy,keepweb,workbench,workbench2,webshell,shell
 )
 
 # External ports used by the Arvados services
index a68450094161accb43ef472def621e15b20b2d79..fdb10cf63632fd0ec2a6df84e55552e831a66c5d 100644 (file)
@@ -19,7 +19,7 @@ DEPLOY_USER=root
 # installer.sh will log in to each of these nodes and then provision
 # it for the specified roles.
 NODES=(
-  [localhost]=api,controller,websocket,dispatcher,keepbalance,keepstore,keepproxy,keepweb,workbench,workbench2,webshell
+  [localhost]=api,controller,websocket,dispatcher,keepbalance,keepstore,keepproxy,keepweb,workbench,workbench2,webshell,shell
 )
 
 # Set this value when installing a cluster in a single host with a single
index f4660be370990302cadbb9b3e11d8f2a44f5de10..638e5de800169bf9c8f3ea5641d61cbdff2e68f1 100755 (executable)
@@ -625,7 +625,7 @@ if [ -z "${ROLES}" ]; then
     echo "extra_custom_certs_dir: /srv/salt/certs" > ${P_DIR}/extra_custom_certs.sls
     echo "extra_custom_certs:" >> ${P_DIR}/extra_custom_certs.sls
 
-    for c in controller websocket workbench workbench2 webshell keepweb keepproxy; do
+    for c in controller websocket workbench workbench2 webshell keepweb keepproxy shell; do
       # Are we in a single-host-single-hostname env?
       if [ "${USE_SINGLE_HOSTNAME}" = "yes" ]; then
         # Are we in a single-host-single-hostname env?