19166: Merge branch 'main' 19166-gateway-tunnel
authorTom Clegg <tom@curii.com>
Mon, 25 Jul 2022 14:56:27 +0000 (10:56 -0400)
committerTom Clegg <tom@curii.com>
Mon, 25 Jul 2022 14:56:27 +0000 (10:56 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

29 files changed:
cmd/arvados-client/container_gateway.go
cmd/arvados-client/container_gateway_test.go
cmd/arvados-server/cmd.go
doc/_config.yml
doc/admin/config-urls.html.textile.liquid
doc/architecture/hpc.html.textile.liquid [new file with mode: 0644]
go.mod
go.sum
lib/controller/federation/conn.go
lib/controller/localdb/conn.go
lib/controller/localdb/container_gateway.go
lib/controller/localdb/container_gateway_test.go
lib/controller/proxy.go
lib/controller/router/request.go
lib/controller/router/router.go
lib/controller/rpc/conn.go
lib/crunchrun/container_gateway.go
lib/crunchrun/crunchrun.go
lib/crunchrun/integration_test.go
lib/lsf/dispatch.go
sdk/go/arvados/api.go
sdk/go/arvados/container_gateway.go
sdk/go/arvadostest/api.go
sdk/python/tests/run_test_server.py
services/api/app/models/container.rb
services/api/test/unit/container_test.rb
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/script.go
services/crunch-dispatch-slurm/script_test.go

index aca6c5b797fa4ec3b036ee8300ae3f4fcbe5e885..55f8c33bc70c77d31f13f16bb924ee4c2a6a1613 100644 (file)
@@ -160,7 +160,9 @@ Options:
                fmt.Fprintf(stderr, "target UUID is not a container or container request UUID: %s\n", targetUUID)
                return 1
        }
-       sshconn, err := rpcconn.ContainerSSH(context.TODO(), arvados.ContainerSSHOptions{
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{
                UUID:          targetUUID,
                DetachKeys:    *detachKeys,
                LoginUsername: loginUsername,
@@ -176,7 +178,6 @@ Options:
                return 0
        }
 
-       ctx, cancel := context.WithCancel(context.Background())
        go func() {
                defer cancel()
                _, err := io.Copy(stdout, sshconn.Conn)
index f4a140c4069a9f0daa01d2263acb350ff604854c..743b91d69bde058e78183faf50a0965b56d4f7b6 100644 (file)
@@ -25,6 +25,7 @@ import (
        "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        check "gopkg.in/check.v1"
 )
@@ -53,6 +54,7 @@ func (s *ClientSuite) TestShellGateway(c *check.C) {
                ContainerUUID: uuid,
                Address:       "0.0.0.0:0",
                AuthSecret:    authSecret,
+               Log:           ctxlog.TestLogger(c),
                // Just forward connections to localhost instead of a
                // container, so we can test without running a
                // container.
@@ -86,6 +88,13 @@ func (s *ClientSuite) TestShellGateway(c *check.C) {
        cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
        cmd.Stdout = &stdout
        cmd.Stderr = &stderr
+       stdin, err := cmd.StdinPipe()
+       c.Assert(err, check.IsNil)
+       go fmt.Fprintln(stdin, "data appears on stdin, but stdin does not close; cmd should exit anyway, not hang")
+       time.AfterFunc(5*time.Second, func() {
+               c.Errorf("timed out -- remote end is probably hung waiting for us to close stdin")
+               stdin.Close()
+       })
        c.Check(cmd.Run(), check.IsNil)
        c.Check(stdout.String(), check.Equals, "ok\n")
 
index d9c41ca587b1194415e44377267d565c4ce4eeb5..438ca206daa06dbb9f5211fce8bf57ed788cf6a0 100644 (file)
@@ -28,6 +28,7 @@ import (
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/health"
+       dispatchslurm "git.arvados.org/arvados.git/services/crunch-dispatch-slurm"
        "git.arvados.org/arvados.git/services/githttpd"
        keepbalance "git.arvados.org/arvados.git/services/keep-balance"
        keepweb "git.arvados.org/arvados.git/services/keep-web"
@@ -53,6 +54,7 @@ var (
                "crunch-run":         crunchrun.Command,
                "dispatch-cloud":     dispatchcloud.Command,
                "dispatch-lsf":       lsf.DispatchCommand,
+               "dispatch-slurm":     dispatchslurm.Command,
                "git-httpd":          githttpd.Command,
                "health":             healthCommand,
                "install":            install.Command,
index d2bb7e797582a8c2a98c850face5442b9e07bfdb..148e1a166e0ac6d96499969e54cb7e5c0c1a3c1d 100644 (file)
@@ -161,6 +161,7 @@ navbar:
     - Computation with Crunch:
       - api/execution.html.textile.liquid
       - architecture/dispatchcloud.html.textile.liquid
+      - architecture/hpc.html.textile.liquid
       - architecture/singularity.html.textile.liquid
     - Other:
       - api/permission-model.html.textile.liquid
index 01c30f0e0eb88eecabc0269cabd40c3aabb07892..500e0d8c8c13aea9a73a9c2a00e9a7e87b381002 100644 (file)
@@ -28,7 +28,7 @@ h2. Overview
 table(table table-bordered table-condensed).
 |_.Service     |_.ExternalURL required? |_.InternalURLs required?|_.InternalURLs must be reachable from other cluster nodes?|_.Note|
 |railsapi       |no                     |yes|no ^1^|InternalURLs only used by Controller|
-|controller     |yes                    |yes|no ^2^|InternalURLs only used by reverse proxy (e.g. Nginx)|
+|controller     |yes                    |yes|yes ^2,4^|InternalURLs used by reverse proxy and container shell connections|
 |arvados-dispatch-cloud|no              |yes|no ^3^|InternalURLs only used to expose Prometheus metrics|
 |arvados-dispatch-lsf|no                |yes|no ^3^|InternalURLs only used to expose Prometheus metrics|
 |git-http       |yes                    |yes|no ^2^|InternalURLs only used by reverse proxy (e.g. Nginx)|
@@ -45,6 +45,7 @@ table(table table-bordered table-condensed).
 ^1^ If @Controller@ runs on a different host than @RailsAPI@, the @InternalURLs@ will need to be reachable from the host that runs @Controller@.
 ^2^ If the reverse proxy (e.g. Nginx) does not run on the same host as the Arvados service it fronts, the @InternalURLs@ will need to be reachable from the host that runs the reverse proxy.
 ^3^ If the Prometheus metrics are not collected from the same machine that runs the service, the @InternalURLs@ will need to be reachable from the host that collects the metrics.
+^4^ If dispatching containers to HPC (Slurm/LSF) and there are multiple @Controller@ services, they must be able to connect to one another using their InternalURLs, otherwise the "tunnel connections":{{site.baseurl}}/architecture/hpc.html enabling "container shell access":{{site.baseurl}}/install/container-shell-access.html will not work.
 
 When @InternalURLs@ do not need to be reachable from other nodes, it is most secure to use loopback addresses as @InternalURLs@, e.g. @http://127.0.0.1:9005@.
 
diff --git a/doc/architecture/hpc.html.textile.liquid b/doc/architecture/hpc.html.textile.liquid
new file mode 100644 (file)
index 0000000..03a4649
--- /dev/null
@@ -0,0 +1,29 @@
+---
+layout: default
+navsection: architecture
+title: Dispatching containers to HPC
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Arvados can be configured to run containers on an HPC cluster using Slurm or LSF, as an alternative to "dispatching to cloud VMs":dispatchcloud.html.
+
+In this configuration, the appropriate Arvados dispatcher service -- @crunch-dispatch-slurm@ or @arvados-dispatch-lsf@ -- picks up each container as it appears in the Arvados queue and submits a short shell script as a batch job to the HPC job queue. The shell script executes the @crunch-run@ container supervisor which retrieves the container specification from the Arvados controller, starts an arv-mount process, runs the container using @docker exec@ or @singularity exec@, and sends updates (logs, outputs, exit code, etc.) back to the Arvados controller.
+
+h2. Container communication channel (reverse https tunnel)
+
+The crunch-run program runs a gateway server to facilitate the “container shell” feature. However, depending on the site's network topology, the Arvados controller may not be able to connect directly to the compute node where a given crunch-run process is running.
+
+Instead, in the HPC configuration, crunch-run connects to the Arvados controller at startup and sets up a multiplexed tunnel, allowing the controller process to connect to crunch-run's gateway server without initiating a connection to the compute node, or even knowing the compute node's IP address.
+
+This means that when a client requests a container shell connection, the traffic goes through two or three servers:
+# The client connects to a controller host C1.
+# If the multiplexed tunnel is connected to a different controller host C2, then C1 proxies the incoming request to C2, using C2's InternalURL.
+# The controller host (C1 or C2) uses the multiplexed tunnel to connect to crunch-run's container gateway.
+
+h2. Scaling
+
+The @API.MaxConcurrentRequests@ configuration should not be set too low, or the long-lived tunnel connections can starve other clients.
diff --git a/go.mod b/go.mod
index 525bae11ee49be2fd83fe7d717384eaf580550dd..aced60dbc4e0ee83281ba8e03a8e0511a02ab220 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -72,6 +72,7 @@ require (
        github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
        github.com/golang/protobuf v1.5.0 // indirect
        github.com/googleapis/gax-go/v2 v2.0.5 // indirect
+       github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
        github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
        github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
        github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect
diff --git a/go.sum b/go.sum
index 82a8d83d7e2be701f3ab3f9b41312883cb62f3ad..422a891e00ef26add10565538607630d2770863d 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -433,6 +433,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
 github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
 github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
 github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 h1:xixZ2bWeofWV68J+x6AzmKuVM/JWCQwkWm6GW/MUR6I=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
index d9f587852d149da46cd49ddbfc9dd46095fe180e..ffb150bf26aa148b511f4bbde98305469ffef5df 100644 (file)
@@ -375,10 +375,14 @@ func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOption
        return conn.chooseBackend(options.UUID).ContainerUnlock(ctx, options)
 }
 
-func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ContainerSSHConnection, error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ConnectionResponse, error) {
        return conn.chooseBackend(options.UUID).ContainerSSH(ctx, options)
 }
 
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) {
+       return conn.chooseBackend(options.UUID).ContainerGatewayTunnel(ctx, options)
+}
+
 func (conn *Conn) ContainerRequestList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerRequestList, error) {
        return conn.generated_ContainerRequestList(ctx, options)
 }
index 104cfe28f5e0cccfb9cf785955229b4f3b297fdf..a36822ad6b1f5df1f73ffbc3536d76a7215f1817 100644 (file)
@@ -11,6 +11,7 @@ import (
        "net/http"
        "os"
        "strings"
+       "sync"
        "time"
 
        "git.arvados.org/arvados.git/lib/controller/railsproxy"
@@ -18,6 +19,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "github.com/hashicorp/yamux"
        "github.com/sirupsen/logrus"
 )
 
@@ -31,6 +33,8 @@ type Conn struct {
        lastVocabularyRefreshCheck time.Time
        lastVocabularyError        error
        loginController
+       gwTunnels     map[string]*yamux.Session
+       gwTunnelsLock sync.Mutex
 }
 
 func NewConn(cluster *arvados.Cluster) *Conn {
index 3b40eccaff68c138e498b9ec497f5f704e249f64..77c5182e9cd924460b81a39770f647baefb3af19 100644 (file)
@@ -6,21 +6,34 @@ package localdb
 
 import (
        "bufio"
+       "bytes"
        "context"
        "crypto/hmac"
        "crypto/sha256"
+       "crypto/subtle"
        "crypto/tls"
        "crypto/x509"
        "errors"
        "fmt"
+       "io"
+       "io/ioutil"
+       "net"
        "net/http"
        "net/url"
        "strings"
 
+       "git.arvados.org/arvados.git/lib/controller/rpc"
+       "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/auth"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "github.com/hashicorp/yamux"
+)
+
+var (
+       forceProxyForTest       = false
+       forceInternalURLForTest *arvados.URL
 )
 
 // ContainerSSH returns a connection to the SSH server in the
@@ -29,51 +42,112 @@ import (
 //
 // If the returned error is nil, the caller is responsible for closing
 // sshconn.Conn.
-func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOptions) (sshconn arvados.ConnectionResponse, err error) {
        user, err := conn.railsProxy.UserGetCurrent(ctx, arvados.GetOptions{})
        if err != nil {
-               return
+               return sshconn, err
        }
        ctr, err := conn.railsProxy.ContainerGet(ctx, arvados.GetOptions{UUID: opts.UUID})
        if err != nil {
-               return
+               return sshconn, err
        }
        ctxRoot := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{conn.cluster.SystemRootToken}})
        if !user.IsAdmin || !conn.cluster.Containers.ShellAccess.Admin {
                if !conn.cluster.Containers.ShellAccess.User {
-                       err = httpserver.ErrorWithStatus(errors.New("shell access is disabled in config"), http.StatusServiceUnavailable)
-                       return
+                       return sshconn, httpserver.ErrorWithStatus(errors.New("shell access is disabled in config"), http.StatusServiceUnavailable)
                }
-               var crs arvados.ContainerRequestList
-               crs, err = conn.railsProxy.ContainerRequestList(ctxRoot, arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"container_uuid", "=", opts.UUID}}})
+               crs, err := conn.railsProxy.ContainerRequestList(ctxRoot, arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"container_uuid", "=", opts.UUID}}})
                if err != nil {
-                       return
+                       return sshconn, err
                }
                for _, cr := range crs.Items {
                        if cr.ModifiedByUserUUID != user.UUID {
-                               err = httpserver.ErrorWithStatus(errors.New("permission denied: container is associated with requests submitted by other users"), http.StatusForbidden)
-                               return
+                               return sshconn, httpserver.ErrorWithStatus(errors.New("permission denied: container is associated with requests submitted by other users"), http.StatusForbidden)
                        }
                }
                if crs.ItemsAvailable != len(crs.Items) {
-                       err = httpserver.ErrorWithStatus(errors.New("incomplete response while checking permission"), http.StatusInternalServerError)
-                       return
+                       return sshconn, httpserver.ErrorWithStatus(errors.New("incomplete response while checking permission"), http.StatusInternalServerError)
                }
        }
 
-       switch ctr.State {
-       case arvados.ContainerStateQueued, arvados.ContainerStateLocked:
-               err = httpserver.ErrorWithStatus(fmt.Errorf("container is not running yet (state is %q)", ctr.State), http.StatusServiceUnavailable)
-               return
-       case arvados.ContainerStateRunning:
-               if ctr.GatewayAddress == "" {
-                       err = httpserver.ErrorWithStatus(errors.New("container is running but gateway is not available -- installation problem or feature not supported"), http.StatusServiceUnavailable)
-                       return
+       conn.gwTunnelsLock.Lock()
+       tunnel := conn.gwTunnels[opts.UUID]
+       conn.gwTunnelsLock.Unlock()
+
+       if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked {
+               return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("container is not running yet (state is %q)", ctr.State), http.StatusServiceUnavailable)
+       } else if ctr.State != arvados.ContainerStateRunning {
+               return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("container has ended (state is %q)", ctr.State), http.StatusGone)
+       }
+
+       // targetHost is the value we'll use in the Host header in our
+       // "Upgrade: ssh" http request. It's just a placeholder
+       // "localhost", unless we decide to connect directly, in which
+       // case we'll set it to the gateway's external ip:host. (The
+       // gateway doesn't even look at it, but we might as well.)
+       targetHost := "localhost"
+       myURL, _ := service.URLFromContext(ctx)
+
+       var rawconn net.Conn
+       if host, _, splitErr := net.SplitHostPort(ctr.GatewayAddress); splitErr == nil && host != "" && host != "127.0.0.1" {
+               // If crunch-run provided a GatewayAddress like
+               // "ipaddr:port", that means "ipaddr" is one of the
+               // external interfaces where the gateway is
+               // listening. In that case, it's the most
+               // reliable/direct option, so we use it even if a
+               // tunnel might also be available.
+               targetHost = ctr.GatewayAddress
+               rawconn, err = net.Dial("tcp", ctr.GatewayAddress)
+               if err != nil {
+                       return sshconn, httpserver.ErrorWithStatus(err, http.StatusServiceUnavailable)
                }
-       default:
-               err = httpserver.ErrorWithStatus(fmt.Errorf("container has ended (state is %q)", ctr.State), http.StatusGone)
-               return
+       } else if tunnel != nil && !(forceProxyForTest && !opts.NoForward) {
+               // If we can't connect directly, and the gateway has
+               // established a yamux tunnel with us, connect through
+               // the tunnel.
+               //
+               // ...except: forceProxyForTest means we are emulating
+               // a situation where the gateway has established a
+               // yamux tunnel with controller B, and the
+               // ContainerSSH request arrives at controller A. If
+               // opts.NoForward==false then we are acting as A, so
+               // we pretend not to have a tunnel, and fall through
+               // to the "tunurl" case below. If opts.NoForward==true
+               // then the client is A and we are acting as B, so we
+               // connect to our tunnel.
+               rawconn, err = tunnel.Open()
+               if err != nil {
+                       return sshconn, httpserver.ErrorWithStatus(err, http.StatusServiceUnavailable)
+               }
+       } else if ctr.GatewayAddress == "" {
+               return sshconn, httpserver.ErrorWithStatus(errors.New("container is running but gateway is not available"), http.StatusServiceUnavailable)
+       } else if tunurl := strings.TrimPrefix(ctr.GatewayAddress, "tunnel "); tunurl != ctr.GatewayAddress &&
+               tunurl != "" &&
+               tunurl != myURL.String() &&
+               !opts.NoForward {
+               // If crunch-run provided a GatewayAddress like
+               // "tunnel https://10.0.0.10:1010/", that means the
+               // gateway has established a yamux tunnel with the
+               // controller process at the indicated InternalURL
+               // (which isn't us, otherwise we would have had
+               // "tunnel != nil" above). We need to proxy through to
+               // the other controller process in order to use the
+               // tunnel.
+               for u := range conn.cluster.Services.Controller.InternalURLs {
+                       if u.String() == tunurl {
+                               ctxlog.FromContext(ctx).Debugf("proxying ContainerSSH request to other controller at %s", u)
+                               u := url.URL(u)
+                               arpc := rpc.NewConn(conn.cluster.ClusterID, &u, conn.cluster.TLS.Insecure, rpc.PassthroughTokenProvider)
+                               opts.NoForward = true
+                               return arpc.ContainerSSH(ctx, opts)
+                       }
+               }
+               ctxlog.FromContext(ctx).Warnf("container gateway provided a tunnel endpoint %s that is not one of Services.Controller.InternalURLs", tunurl)
+               return sshconn, httpserver.ErrorWithStatus(errors.New("container gateway is running but tunnel endpoint is invalid"), http.StatusServiceUnavailable)
+       } else {
+               return sshconn, httpserver.ErrorWithStatus(errors.New("container gateway is running but tunnel is down"), http.StatusServiceUnavailable)
        }
+
        // crunch-run uses a self-signed / unverifiable TLS
        // certificate, so we use the following scheme to ensure we're
        // not talking to a MITM.
@@ -93,7 +167,7 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
        // X-Arvados-Authorization-Response header, proving that the
        // server knows ctrKey.
        var requestAuth, respondAuth string
-       netconn, err := tls.Dial("tcp", ctr.GatewayAddress, &tls.Config{
+       tlsconn := tls.Client(rawconn, &tls.Config{
                InsecureSkipVerify: true,
                VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
                        if len(rawCerts) == 0 {
@@ -111,47 +185,57 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
                        return nil
                },
        })
+       err = tlsconn.HandshakeContext(ctx)
        if err != nil {
-               err = httpserver.ErrorWithStatus(err, http.StatusBadGateway)
-               return
+               return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("TLS handshake failed: %w", err), http.StatusBadGateway)
        }
        if respondAuth == "" {
-               err = httpserver.ErrorWithStatus(errors.New("BUG: no respondAuth"), http.StatusInternalServerError)
-               return
+               tlsconn.Close()
+               return sshconn, httpserver.ErrorWithStatus(errors.New("BUG: no respondAuth"), http.StatusInternalServerError)
        }
-       bufr := bufio.NewReader(netconn)
-       bufw := bufio.NewWriter(netconn)
+       bufr := bufio.NewReader(tlsconn)
+       bufw := bufio.NewWriter(tlsconn)
 
        u := url.URL{
                Scheme: "http",
-               Host:   ctr.GatewayAddress,
+               Host:   targetHost,
                Path:   "/ssh",
        }
-       bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
+       postform := url.Values{
+               "uuid":           {opts.UUID},
+               "detach_keys":    {opts.DetachKeys},
+               "login_username": {opts.LoginUsername},
+               "no_forward":     {fmt.Sprintf("%v", opts.NoForward)},
+       }
+       postdata := postform.Encode()
+       bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n")
        bufw.WriteString("Host: " + u.Host + "\r\n")
        bufw.WriteString("Upgrade: ssh\r\n")
-       bufw.WriteString("X-Arvados-Target-Uuid: " + opts.UUID + "\r\n")
        bufw.WriteString("X-Arvados-Authorization: " + requestAuth + "\r\n")
-       bufw.WriteString("X-Arvados-Detach-Keys: " + opts.DetachKeys + "\r\n")
-       bufw.WriteString("X-Arvados-Login-Username: " + opts.LoginUsername + "\r\n")
+       bufw.WriteString("Content-Type: application/x-www-form-urlencoded\r\n")
+       fmt.Fprintf(bufw, "Content-Length: %d\r\n", len(postdata))
        bufw.WriteString("\r\n")
+       bufw.WriteString(postdata)
        bufw.Flush()
-       resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
+       resp, err := http.ReadResponse(bufr, &http.Request{Method: "POST"})
        if err != nil {
-               err = httpserver.ErrorWithStatus(fmt.Errorf("error reading http response from gateway: %w", err), http.StatusBadGateway)
-               netconn.Close()
-               return
+               tlsconn.Close()
+               return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("error reading http response from gateway: %w", err), http.StatusBadGateway)
        }
-       if resp.Header.Get("X-Arvados-Authorization-Response") != respondAuth {
-               err = httpserver.ErrorWithStatus(errors.New("bad X-Arvados-Authorization-Response header"), http.StatusBadGateway)
-               netconn.Close()
-               return
+       defer resp.Body.Close()
+       if resp.StatusCode != http.StatusSwitchingProtocols {
+               body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 1000))
+               tlsconn.Close()
+               return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("unexpected status %s %q", resp.Status, body), http.StatusBadGateway)
        }
        if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
                strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
-               err = httpserver.ErrorWithStatus(errors.New("bad upgrade"), http.StatusBadGateway)
-               netconn.Close()
-               return
+               tlsconn.Close()
+               return sshconn, httpserver.ErrorWithStatus(errors.New("bad upgrade"), http.StatusBadGateway)
+       }
+       if resp.Header.Get("X-Arvados-Authorization-Response") != respondAuth {
+               tlsconn.Close()
+               return sshconn, httpserver.ErrorWithStatus(errors.New("bad X-Arvados-Authorization-Response header"), http.StatusBadGateway)
        }
 
        if !ctr.InteractiveSessionStarted {
@@ -162,13 +246,65 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
                        },
                })
                if err != nil {
-                       netconn.Close()
-                       return
+                       tlsconn.Close()
+                       return sshconn, httpserver.ErrorWithStatus(err, http.StatusInternalServerError)
                }
        }
 
-       sshconn.Conn = netconn
+       sshconn.Conn = tlsconn
        sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
        sshconn.Logger = ctxlog.FromContext(ctx)
+       sshconn.Header = http.Header{"Upgrade": {"ssh"}}
+       return sshconn, nil
+}
+
+// ContainerGatewayTunnel sets up a tunnel enabling us (controller) to
+// connect to the caller's (crunch-run's) gateway server.
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, opts arvados.ContainerGatewayTunnelOptions) (resp arvados.ConnectionResponse, err error) {
+       h := hmac.New(sha256.New, []byte(conn.cluster.SystemRootToken))
+       fmt.Fprint(h, opts.UUID)
+       authSecret := fmt.Sprintf("%x", h.Sum(nil))
+       if subtle.ConstantTimeCompare([]byte(authSecret), []byte(opts.AuthSecret)) != 1 {
+               ctxlog.FromContext(ctx).Info("received incorrect auth_secret")
+               return resp, httpserver.ErrorWithStatus(errors.New("authentication error"), http.StatusUnauthorized)
+       }
+
+       muxconn, clientconn := net.Pipe()
+       tunnel, err := yamux.Server(muxconn, nil)
+       if err != nil {
+               clientconn.Close()
+               return resp, httpserver.ErrorWithStatus(err, http.StatusInternalServerError)
+       }
+
+       conn.gwTunnelsLock.Lock()
+       if conn.gwTunnels == nil {
+               conn.gwTunnels = map[string]*yamux.Session{opts.UUID: tunnel}
+       } else {
+               conn.gwTunnels[opts.UUID] = tunnel
+       }
+       conn.gwTunnelsLock.Unlock()
+
+       go func() {
+               <-tunnel.CloseChan()
+               conn.gwTunnelsLock.Lock()
+               if conn.gwTunnels[opts.UUID] == tunnel {
+                       delete(conn.gwTunnels, opts.UUID)
+               }
+               conn.gwTunnelsLock.Unlock()
+       }()
+
+       // Assuming we're acting as the backend of an http server,
+       // lib/controller/router will call resp's ServeHTTP handler,
+       // which upgrades the incoming http connection to a raw socket
+       // and connects it to our yamux.Server through our net.Pipe().
+       resp.Conn = clientconn
+       resp.Bufrw = &bufio.ReadWriter{Reader: bufio.NewReader(&bytes.Buffer{}), Writer: bufio.NewWriter(&bytes.Buffer{})}
+       resp.Logger = ctxlog.FromContext(ctx)
+       resp.Header = http.Header{"Upgrade": {"tunnel"}}
+       if u, ok := service.URLFromContext(ctx); ok {
+               resp.Header.Set("X-Arvados-Internal-Url", u.String())
+       } else if forceInternalURLForTest != nil {
+               resp.Header.Set("X-Arvados-Internal-Url", forceInternalURLForTest.String())
+       }
        return
 }
index 271760420153481daac1f0f129a63c684591b94b..2c882c7852a87b0ed23e2821d3e5fde6ec400439 100644 (file)
@@ -12,9 +12,14 @@ import (
        "io"
        "io/ioutil"
        "net"
+       "net/http/httptest"
+       "net/url"
+       "strings"
        "time"
 
        "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/lib/controller/router"
+       "git.arvados.org/arvados.git/lib/controller/rpc"
        "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
@@ -55,12 +60,26 @@ func (s *ContainerGatewaySuite) SetUpSuite(c *check.C) {
        fmt.Fprint(h, s.ctrUUID)
        authKey := fmt.Sprintf("%x", h.Sum(nil))
 
+       rtr := router.New(s.localdb, router.Config{})
+       srv := httptest.NewUnstartedServer(rtr)
+       srv.StartTLS()
+       // the test setup doesn't use lib/service so
+       // service.URLFromContext() returns nothing -- instead, this
+       // is how we advertise our internal URL and enable
+       // proxy-to-other-controller mode,
+       forceInternalURLForTest = &arvados.URL{Scheme: "https", Host: srv.Listener.Addr().String()}
+       ac := &arvados.Client{
+               APIHost:   srv.Listener.Addr().String(),
+               AuthToken: arvadostest.Dispatch1Token,
+               Insecure:  true,
+       }
        s.gw = &crunchrun.Gateway{
                ContainerUUID: s.ctrUUID,
                AuthSecret:    authKey,
                Address:       "localhost:0",
                Log:           ctxlog.TestLogger(c),
                Target:        crunchrun.GatewayTargetStub{},
+               ArvadosClient: ac,
        }
        c.Assert(s.gw.Start(), check.IsNil)
        rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
@@ -69,18 +88,25 @@ func (s *ContainerGatewaySuite) SetUpSuite(c *check.C) {
                Attrs: map[string]interface{}{
                        "state": arvados.ContainerStateLocked}})
        c.Assert(err, check.IsNil)
-       _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+}
+
+func (s *ContainerGatewaySuite) SetUpTest(c *check.C) {
+       // clear any tunnel sessions started by previous test cases
+       s.localdb.gwTunnelsLock.Lock()
+       s.localdb.gwTunnels = nil
+       s.localdb.gwTunnelsLock.Unlock()
+
+       rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
+       _, err := s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
                UUID: s.ctrUUID,
                Attrs: map[string]interface{}{
                        "state":           arvados.ContainerStateRunning,
                        "gateway_address": s.gw.Address}})
        c.Assert(err, check.IsNil)
-}
 
-func (s *ContainerGatewaySuite) SetUpTest(c *check.C) {
        s.cluster.Containers.ShellAccess.Admin = true
        s.cluster.Containers.ShellAccess.User = true
-       _, err := arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID)
+       _, err = arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID)
        c.Check(err, check.IsNil)
 }
 
@@ -234,3 +260,136 @@ func (s *ContainerGatewaySuite) TestConnectFail(c *check.C) {
        _, err = s.localdb.ContainerSSH(ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID})
        c.Check(err, check.ErrorMatches, `.* 404 .*`)
 }
+
+func (s *ContainerGatewaySuite) TestCreateTunnel(c *check.C) {
+       // no AuthSecret
+       conn, err := s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID: s.ctrUUID,
+       })
+       c.Check(err, check.ErrorMatches, `authentication error`)
+       c.Check(conn.Conn, check.IsNil)
+
+       // bogus AuthSecret
+       conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID:       s.ctrUUID,
+               AuthSecret: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
+       })
+       c.Check(err, check.ErrorMatches, `authentication error`)
+       c.Check(conn.Conn, check.IsNil)
+
+       // good AuthSecret
+       conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID:       s.ctrUUID,
+               AuthSecret: s.gw.AuthSecret,
+       })
+       c.Check(err, check.IsNil)
+       c.Check(conn.Conn, check.NotNil)
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelWithProxyOK(c *check.C) {
+       forceProxyForTest = true
+       defer func() { forceProxyForTest = false }()
+       s.cluster.Services.Controller.InternalURLs[*forceInternalURLForTest] = arvados.ServiceInstance{}
+       defer delete(s.cluster.Services.Controller.InternalURLs, *forceInternalURLForTest)
+       s.testConnectThroughTunnel(c, "")
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelWithProxyError(c *check.C) {
+       forceProxyForTest = true
+       defer func() { forceProxyForTest = false }()
+       // forceInternalURLForTest shouldn't be used because it isn't
+       // listed in s.cluster.Services.Controller.InternalURLs
+       s.testConnectThroughTunnel(c, `.*tunnel endpoint is invalid.*`)
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelNoProxyOK(c *check.C) {
+       s.testConnectThroughTunnel(c, "")
+}
+
+func (s *ContainerGatewaySuite) testConnectThroughTunnel(c *check.C, expectErrorMatch string) {
+       rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
+       // Until the tunnel starts up, set gateway_address to a value
+       // that can't work. We want to ensure the only way we can
+       // reach the gateway is through the tunnel.
+       gwaddr := "127.0.0.1:0"
+       tungw := &crunchrun.Gateway{
+               ContainerUUID: s.ctrUUID,
+               AuthSecret:    s.gw.AuthSecret,
+               Log:           ctxlog.TestLogger(c),
+               Target:        crunchrun.GatewayTargetStub{},
+               ArvadosClient: s.gw.ArvadosClient,
+               UpdateTunnelURL: func(url string) {
+                       c.Logf("UpdateTunnelURL(%q)", url)
+                       gwaddr = "tunnel " + url
+                       s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+                               UUID: s.ctrUUID,
+                               Attrs: map[string]interface{}{
+                                       "gateway_address": gwaddr}})
+               },
+       }
+       c.Assert(tungw.Start(), check.IsNil)
+
+       // We didn't supply an external hostname in the Address field,
+       // so Start() should assign a local address.
+       host, _, err := net.SplitHostPort(tungw.Address)
+       c.Assert(err, check.IsNil)
+       c.Check(host, check.Equals, "127.0.0.1")
+
+       _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+               UUID: s.ctrUUID,
+               Attrs: map[string]interface{}{
+                       "state":           arvados.ContainerStateRunning,
+                       "gateway_address": gwaddr}})
+       c.Assert(err, check.IsNil)
+
+       for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); time.Sleep(time.Second / 2) {
+               ctr, err := s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID})
+               c.Assert(err, check.IsNil)
+               c.Check(ctr.InteractiveSessionStarted, check.Equals, false)
+               c.Logf("ctr.GatewayAddress == %s", ctr.GatewayAddress)
+               if strings.HasPrefix(ctr.GatewayAddress, "tunnel ") {
+                       break
+               }
+       }
+
+       c.Log("connecting to gateway through tunnel")
+       arpc := rpc.NewConn("", &url.URL{Scheme: "https", Host: s.gw.ArvadosClient.APIHost}, true, rpc.PassthroughTokenProvider)
+       sshconn, err := arpc.ContainerSSH(s.ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID})
+       if expectErrorMatch != "" {
+               c.Check(err, check.ErrorMatches, expectErrorMatch)
+               return
+       }
+       c.Assert(err, check.IsNil)
+       c.Assert(sshconn.Conn, check.NotNil)
+       defer sshconn.Conn.Close()
+
+       done := make(chan struct{})
+       go func() {
+               defer close(done)
+
+               // Receive text banner
+               buf := make([]byte, 12)
+               _, err := io.ReadFull(sshconn.Conn, buf)
+               c.Check(err, check.IsNil)
+               c.Check(string(buf), check.Equals, "SSH-2.0-Go\r\n")
+
+               // Send text banner
+               _, err = sshconn.Conn.Write([]byte("SSH-2.0-Fake\r\n"))
+               c.Check(err, check.IsNil)
+
+               // Receive binary
+               _, err = io.ReadFull(sshconn.Conn, buf[:4])
+               c.Check(err, check.IsNil)
+
+               // If we can get this far into an SSH handshake...
+               c.Logf("was able to read %x -- success, tunnel is working", buf[:4])
+       }()
+       select {
+       case <-done:
+       case <-time.After(time.Second):
+               c.Fail()
+       }
+       ctr, err := s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID})
+       c.Check(err, check.IsNil)
+       c.Check(ctr.InteractiveSessionStarted, check.Equals, true)
+}
index 13dfcac16abb0bb27c7b1f3d50d024436453f97c..47b8cb47112ad5990d2f80dd23c72cf98fb85a70 100644 (file)
@@ -63,10 +63,13 @@ func (p *proxy) Do(
                        hdrOut[k] = v
                }
        }
-       xff := reqIn.RemoteAddr
-       if xffIn := reqIn.Header.Get("X-Forwarded-For"); xffIn != "" {
-               xff = xffIn + "," + xff
+       xff := ""
+       for _, xffIn := range reqIn.Header["X-Forwarded-For"] {
+               if xffIn != "" {
+                       xff += xffIn + ","
+               }
        }
+       xff += reqIn.RemoteAddr
        hdrOut.Set("X-Forwarded-For", xff)
        if hdrOut.Get("X-Forwarded-Proto") == "" {
                hdrOut.Set("X-Forwarded-Proto", reqIn.URL.Scheme)
index 06141b1033e3f0034e003eab07da11c17153496e..31f2e1d7baf5098a377ffe9d1acd7b737958231d 100644 (file)
@@ -176,6 +176,7 @@ var boolParams = map[string]bool{
        "bypass_federation":       true,
        "recursive":               true,
        "exclude_home_project":    true,
+       "no_forward":              true,
 }
 
 func stringToBool(s string) bool {
index 586ea8e676ec9ae2a08a7f1855f7f17d8ef5754a..80d5e929850cd18df389daeddb18eb4b12387a38 100644 (file)
@@ -244,6 +244,24 @@ func (rtr *router) addRoutes() {
                                return rtr.backend.ContainerSSH(ctx, *opts.(*arvados.ContainerSSHOptions))
                        },
                },
+               {
+                       // arvados-client built before commit
+                       // bdc29d3129f6d75aa9ce0a24ffb849a272b06f08
+                       // used GET with params in headers instead of
+                       // POST form
+                       arvados.APIEndpoint{"GET", "arvados/v1/connect/{uuid}/ssh", ""},
+                       func() interface{} { return &arvados.ContainerSSHOptions{} },
+                       func(ctx context.Context, opts interface{}) (interface{}, error) {
+                               return nil, httpError(http.StatusGone, fmt.Errorf("API endpoint is obsolete -- please upgrade your arvados-client program"))
+                       },
+               },
+               {
+                       arvados.EndpointContainerGatewayTunnel,
+                       func() interface{} { return &arvados.ContainerGatewayTunnelOptions{} },
+                       func(ctx context.Context, opts interface{}) (interface{}, error) {
+                               return rtr.backend.ContainerGatewayTunnel(ctx, *opts.(*arvados.ContainerGatewayTunnelOptions))
+                       },
+               },
                {
                        arvados.EndpointGroupCreate,
                        func() interface{} { return &arvados.CreateOptions{} },
index 1148068d70896c2ff4b16074c731fedf23bb5bbb..0e532f23c070d8b5c64a15bd8bef46494702ae5a 100644 (file)
@@ -23,6 +23,7 @@ import (
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
 )
 
@@ -331,7 +332,36 @@ func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOption
 // ContainerSSH returns a connection to the out-of-band SSH server for
 // a running container. If the returned error is nil, the caller is
 // responsible for closing sshconn.Conn.
-func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ConnectionResponse, err error) {
+       u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
+       if err != nil {
+               err = fmt.Errorf("url.Parse: %w", err)
+               return
+       }
+       return conn.socket(ctx, u, "ssh", url.Values{
+               "detach_keys":    {options.DetachKeys},
+               "login_username": {options.LoginUsername},
+               "no_forward":     {fmt.Sprintf("%v", options.NoForward)},
+       })
+}
+
+// ContainerGatewayTunnel returns a connection to a yamux session on
+// the controller. The caller should connect the returned resp.Conn to
+// a client-side yamux session.
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (tunnelconn arvados.ConnectionResponse, err error) {
+       u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", options.UUID, -1))
+       if err != nil {
+               err = fmt.Errorf("url.Parse: %w", err)
+               return
+       }
+       return conn.socket(ctx, u, "tunnel", url.Values{
+               "auth_secret": {options.AuthSecret},
+       })
+}
+
+// socket sets up a socket using the specified API endpoint and
+// upgrade header.
+func (conn *Conn) socket(ctx context.Context, u *url.URL, upgradeHeader string, postform url.Values) (connresp arvados.ConnectionResponse, err error) {
        addr := conn.baseURL.Host
        if strings.Index(addr, ":") < 1 || (strings.Contains(addr, "::") && addr[0] != '[') {
                // hostname or ::1 or 1::1
@@ -343,8 +373,7 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH
        }
        netconn, err := tls.Dial("tcp", addr, &tls.Config{InsecureSkipVerify: insecure})
        if err != nil {
-               err = fmt.Errorf("tls.Dial: %w", err)
-               return
+               return connresp, fmt.Errorf("tls.Dial: %w", err)
        }
        defer func() {
                if err != nil {
@@ -354,36 +383,30 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH
        bufr := bufio.NewReader(netconn)
        bufw := bufio.NewWriter(netconn)
 
-       u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
-       if err != nil {
-               err = fmt.Errorf("tls.Dial: %w", err)
-               return
-       }
-       u.RawQuery = url.Values{
-               "detach_keys":    {options.DetachKeys},
-               "login_username": {options.LoginUsername},
-       }.Encode()
        tokens, err := conn.tokenProvider(ctx)
        if err != nil {
-               return
+               return connresp, err
        } else if len(tokens) < 1 {
-               err = httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
-               return
+               return connresp, httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
        }
-       bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
+       postdata := postform.Encode()
+       bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n")
        bufw.WriteString("Authorization: Bearer " + tokens[0] + "\r\n")
        bufw.WriteString("Host: " + u.Host + "\r\n")
-       bufw.WriteString("Upgrade: ssh\r\n")
+       bufw.WriteString("Upgrade: " + upgradeHeader + "\r\n")
+       bufw.WriteString("Content-Type: application/x-www-form-urlencoded\r\n")
+       fmt.Fprintf(bufw, "Content-Length: %d\r\n", len(postdata))
        bufw.WriteString("\r\n")
+       bufw.WriteString(postdata)
        bufw.Flush()
-       resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
+       resp, err := http.ReadResponse(bufr, &http.Request{Method: "POST"})
        if err != nil {
-               err = fmt.Errorf("http.ReadResponse: %w", err)
-               return
+               return connresp, fmt.Errorf("http.ReadResponse: %w", err)
        }
+       defer resp.Body.Close()
        if resp.StatusCode != http.StatusSwitchingProtocols {
-               defer resp.Body.Close()
-               body, _ := ioutil.ReadAll(resp.Body)
+               ctxlog.FromContext(ctx).Infof("rpc.Conn.socket: server %s did not switch protocols, got status %s", u.String(), resp.Status)
+               body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 10000))
                var message string
                var errDoc httpserver.ErrorResponse
                if err := json.Unmarshal(body, &errDoc); err == nil {
@@ -391,17 +414,16 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH
                } else {
                        message = fmt.Sprintf("%q", body)
                }
-               err = fmt.Errorf("server did not provide a tunnel: %s (HTTP %d)", message, resp.StatusCode)
-               return
+               return connresp, fmt.Errorf("server did not provide a tunnel: %s: %s", resp.Status, message)
        }
-       if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
+       if strings.ToLower(resp.Header.Get("Upgrade")) != upgradeHeader ||
                strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
-               err = fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection"))
-               return
+               return connresp, fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection"))
        }
-       sshconn.Conn = netconn
-       sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
-       return
+       connresp.Conn = netconn
+       connresp.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
+       connresp.Header = resp.Header
+       return connresp, nil
 }
 
 func (conn *Conn) ContainerRequestCreate(ctx context.Context, options arvados.CreateOptions) (arvados.ContainerRequest, error) {
index 01457015e16f1870bf4adf4785b8f9c08cec10d5..3cb93fc746a78d01afd6a40023e00903a5c3a846 100644 (file)
@@ -14,16 +14,22 @@ import (
        "io"
        "net"
        "net/http"
+       "net/url"
        "os"
        "os/exec"
        "sync"
        "syscall"
+       "time"
 
+       "git.arvados.org/arvados.git/lib/controller/rpc"
        "git.arvados.org/arvados.git/lib/selfsigned"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/auth"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "github.com/creack/pty"
        "github.com/google/shlex"
+       "github.com/hashicorp/yamux"
        "golang.org/x/crypto/ssh"
        "golang.org/x/net/context"
 )
@@ -45,12 +51,32 @@ func (GatewayTargetStub) InjectCommand(ctx context.Context, detachKeys, username
 
 type Gateway struct {
        ContainerUUID string
-       Address       string // listen host:port; if port=0, Start() will change it to the selected port
-       AuthSecret    string
-       Target        GatewayTarget
-       Log           interface {
+       // Caller should set Address to "", or "host:0" or "host:port"
+       // where host is a known external IP address; port is a
+       // desired port number to listen on; and ":0" chooses an
+       // available dynamic port.
+       //
+       // If Address is "", Start() listens only on the loopback
+       // interface (and changes Address to "127.0.0.1:port").
+       // Otherwise it listens on all interfaces.
+       //
+       // If Address is "host:0", Start() updates Address to
+       // "host:port".
+       Address    string
+       AuthSecret string
+       Target     GatewayTarget
+       Log        interface {
                Printf(fmt string, args ...interface{})
        }
+       // If non-nil, set up a ContainerGatewayTunnel, so that the
+       // controller can connect to us even if our external IP
+       // address is unknown or not routable from controller.
+       ArvadosClient *arvados.Client
+
+       // When a tunnel is connected or reconnected, this func (if
+       // not nil) will be called with the InternalURL of the
+       // controller process at the other end of the tunnel.
+       UpdateTunnelURL func(url string)
 
        sshConfig   ssh.ServerConfig
        requestAuth string
@@ -99,7 +125,22 @@ func (gw *Gateway) Start() error {
        // from arvados-controller, and PORT is either the desired
        // port where we should run our gateway server, or "0" if we
        // should choose an available port.
-       host, port, err := net.SplitHostPort(gw.Address)
+       extAddr := gw.Address
+       // Generally we can't know which local interface corresponds
+       // to an externally reachable IP address, so if we expect to
+       // be reachable by external hosts, we listen on all
+       // interfaces.
+       listenHost := ""
+       if extAddr == "" {
+               // If the dispatcher doesn't tell us our external IP
+               // address, controller will only be able to connect
+               // through the tunnel (see runTunnel), so our gateway
+               // server only needs to listen on the loopback
+               // interface.
+               extAddr = "127.0.0.1:0"
+               listenHost = "127.0.0.1"
+       }
+       extHost, extPort, err := net.SplitHostPort(extAddr)
        if err != nil {
                return err
        }
@@ -121,26 +162,104 @@ func (gw *Gateway) Start() error {
                                Certificates: []tls.Certificate{cert},
                        },
                },
-               Addr: ":" + port,
+               Addr: net.JoinHostPort(listenHost, extPort),
        }
        err = srv.Start()
        if err != nil {
                return err
        }
-       // Get the port number we are listening on (the port might be
+       go func() {
+               err := srv.Wait()
+               gw.Log.Printf("gateway server stopped: %s", err)
+       }()
+       // Get the port number we are listening on (extPort might be
        // "0" or a port name, in which case this will be different).
-       _, port, err = net.SplitHostPort(srv.Addr)
+       _, listenPort, err := net.SplitHostPort(srv.Addr)
        if err != nil {
                return err
        }
-       // When changing state to Running, we will set
-       // gateway_address to "HOST:PORT" where HOST is our
-       // external hostname/IP as provided by arvados-dispatch-cloud,
-       // and PORT is the port number we ended up listening on.
-       gw.Address = net.JoinHostPort(host, port)
+       // When changing state to Running, the caller will want to set
+       // gateway_address to a "HOST:PORT" that, if controller
+       // connects to it, will reach this gateway server.
+       //
+       // The most likely thing to work is: HOST is our external
+       // hostname/IP as provided by the caller
+       // (arvados-dispatch-cloud) or 127.0.0.1 to indicate
+       // non-tunnel connections aren't available; and PORT is the
+       // port number we are listening on.
+       gw.Address = net.JoinHostPort(extHost, listenPort)
+       gw.Log.Printf("gateway server listening at %s", gw.Address)
+       if gw.ArvadosClient != nil {
+               go gw.maintainTunnel(gw.Address)
+       }
        return nil
 }
 
+func (gw *Gateway) maintainTunnel(addr string) {
+       for ; ; time.Sleep(5 * time.Second) {
+               err := gw.runTunnel(addr)
+               gw.Log.Printf("runTunnel: %s", err)
+       }
+}
+
+// runTunnel connects to controller and sets up a tunnel through
+// which controller can connect to the gateway server at the given
+// addr.
+func (gw *Gateway) runTunnel(addr string) error {
+       ctx := auth.NewContext(context.Background(), auth.NewCredentials(gw.ArvadosClient.AuthToken))
+       arpc := rpc.NewConn("", &url.URL{Scheme: "https", Host: gw.ArvadosClient.APIHost}, gw.ArvadosClient.Insecure, rpc.PassthroughTokenProvider)
+       tun, err := arpc.ContainerGatewayTunnel(ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID:       gw.ContainerUUID,
+               AuthSecret: gw.AuthSecret,
+       })
+       if err != nil {
+               return fmt.Errorf("error creating gateway tunnel: %s", err)
+       }
+       mux, err := yamux.Client(tun.Conn, nil)
+       if err != nil {
+               return fmt.Errorf("error setting up mux client end: %s", err)
+       }
+       if url := tun.Header.Get("X-Arvados-Internal-Url"); url != "" && gw.UpdateTunnelURL != nil {
+               gw.UpdateTunnelURL(url)
+       }
+       for {
+               muxconn, err := mux.AcceptStream()
+               if err != nil {
+                       return err
+               }
+               gw.Log.Printf("tunnel connection %d started", muxconn.StreamID())
+               go func() {
+                       defer muxconn.Close()
+                       gwconn, err := net.Dial("tcp", addr)
+                       if err != nil {
+                               gw.Log.Printf("tunnel connection %d: error connecting to %s: %s", muxconn.StreamID(), addr, err)
+                               return
+                       }
+                       defer gwconn.Close()
+                       var wg sync.WaitGroup
+                       wg.Add(2)
+                       go func() {
+                               defer wg.Done()
+                               _, err := io.Copy(gwconn, muxconn)
+                               if err != nil {
+                                       gw.Log.Printf("tunnel connection %d: mux end: %s", muxconn.StreamID(), err)
+                               }
+                               gwconn.Close()
+                       }()
+                       go func() {
+                               defer wg.Done()
+                               _, err := io.Copy(muxconn, gwconn)
+                               if err != nil {
+                                       gw.Log.Printf("tunnel connection %d: gateway end: %s", muxconn.StreamID(), err)
+                               }
+                               muxconn.Close()
+                       }()
+                       wg.Wait()
+                       gw.Log.Printf("tunnel connection %d finished", muxconn.StreamID())
+               }()
+       }
+}
+
 // handleSSH connects to an SSH server that allows the caller to run
 // interactive commands as root (or any other desired user) inside the
 // container. The tunnel itself can only be created by an
@@ -166,11 +285,12 @@ func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) {
        // In future we'll handle browser traffic too, but for now the
        // only traffic we expect is an SSH tunnel from
        // (*lib/controller/localdb.Conn)ContainerSSH()
-       if req.Method != "GET" || req.Header.Get("Upgrade") != "ssh" {
+       if req.Method != "POST" || req.Header.Get("Upgrade") != "ssh" {
                http.Error(w, "path not found", http.StatusNotFound)
                return
        }
-       if want := req.Header.Get("X-Arvados-Target-Uuid"); want != gw.ContainerUUID {
+       req.ParseForm()
+       if want := req.Form.Get("uuid"); want != gw.ContainerUUID {
                http.Error(w, fmt.Sprintf("misdirected request: meant for %q but received by crunch-run %q", want, gw.ContainerUUID), http.StatusBadGateway)
                return
        }
@@ -178,8 +298,8 @@ func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) {
                http.Error(w, "bad X-Arvados-Authorization header", http.StatusUnauthorized)
                return
        }
-       detachKeys := req.Header.Get("X-Arvados-Detach-Keys")
-       username := req.Header.Get("X-Arvados-Login-Username")
+       detachKeys := req.Form.Get("detach_keys")
+       username := req.Form.Get("login_username")
        if username == "" {
                username = "root"
        }
@@ -204,7 +324,9 @@ func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) {
        ctx := req.Context()
 
        conn, newchans, reqs, err := ssh.NewServerConn(netconn, &gw.sshConfig)
-       if err != nil {
+       if err == io.EOF {
+               return
+       } else if err != nil {
                gw.Log.Printf("ssh.NewServerConn: %s", err)
                return
        }
@@ -278,9 +400,11 @@ func (gw *Gateway) handleDirectTCPIP(ctx context.Context, newch ssh.NewChannel)
 func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, detachKeys, username string) {
        ch, reqs, err := newch.Accept()
        if err != nil {
-               gw.Log.Printf("accept session channel: %s", err)
+               gw.Log.Printf("error accepting session channel: %s", err)
                return
        }
+       defer ch.Close()
+
        var pty0, tty0 *os.File
        // Where to send errors/messages for the client to see
        logw := io.Writer(ch.Stderr())
@@ -289,10 +413,28 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
        eol := "\n"
        // Env vars to add to child process
        termEnv := []string(nil)
-       for req := range reqs {
+
+       started := 0
+       wantClose := make(chan struct{})
+       for {
+               var req *ssh.Request
+               select {
+               case r, ok := <-reqs:
+                       if !ok {
+                               return
+                       }
+                       req = r
+               case <-wantClose:
+                       return
+               }
                ok := false
                switch req.Type {
                case "shell", "exec":
+                       if started++; started != 1 {
+                               // RFC 4254 6.5: "Only one of these
+                               // requests can succeed per channel."
+                               break
+                       }
                        ok = true
                        var payload struct {
                                Command string
@@ -312,7 +454,7 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
                                }
                                defer func() {
                                        ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
-                                       ch.Close()
+                                       close(wantClose)
                                }()
 
                                cmd, err := gw.Target.InjectCommand(ctx, detachKeys, username, tty0 != nil, execargs)
@@ -322,20 +464,39 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
                                        resp.Status = 1
                                        return
                                }
-                               cmd.Stdin = ch
-                               cmd.Stdout = ch
-                               cmd.Stderr = ch.Stderr()
                                if tty0 != nil {
                                        cmd.Stdin = tty0
                                        cmd.Stdout = tty0
                                        cmd.Stderr = tty0
-                                       var wg sync.WaitGroup
-                                       defer wg.Wait()
-                                       wg.Add(2)
-                                       go func() { io.Copy(ch, pty0); wg.Done() }()
-                                       go func() { io.Copy(pty0, ch); wg.Done() }()
+                                       go io.Copy(ch, pty0)
+                                       go io.Copy(pty0, ch)
                                        // Send our own debug messages to tty as well.
                                        logw = tty0
+                               } else {
+                                       // StdinPipe may seem
+                                       // superfluous here, but it's
+                                       // not: it causes cmd.Run() to
+                                       // return when the subprocess
+                                       // exits. Without it, Run()
+                                       // waits for stdin to close,
+                                       // which causes "ssh ... echo
+                                       // ok" (with the client's
+                                       // stdin connected to a
+                                       // terminal or something) to
+                                       // hang.
+                                       stdin, err := cmd.StdinPipe()
+                                       if err != nil {
+                                               fmt.Fprintln(ch.Stderr(), err)
+                                               ch.CloseWrite()
+                                               resp.Status = 1
+                                               return
+                                       }
+                                       go func() {
+                                               io.Copy(stdin, ch)
+                                               stdin.Close()
+                                       }()
+                                       cmd.Stdout = ch
+                                       cmd.Stderr = ch.Stderr()
                                }
                                cmd.SysProcAttr = &syscall.SysProcAttr{
                                        Setctty: tty0 != nil,
@@ -403,7 +564,7 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
                        // would be a gaping security
                        // hole).
                default:
-                       // fmt.Fprintf(logw, "declining %q req"+eol, req.Type)
+                       // fmt.Fprintf(logw, "declined request %q on ssh channel"+eol, req.Type)
                }
                if req.WantReply {
                        req.Reply(ok, nil)
index 68181395fadcbafba4227b0d7cc16eb4b2f8624e..eadf22876f9d6016668f049ddda2f100ce8eb0a3 100644 (file)
@@ -1744,6 +1744,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
        brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+       version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
 
        ignoreDetachFlag := false
        if len(args) > 0 && args[0] == "-no-detach" {
@@ -1759,6 +1760,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
                return code
+       } else if *version {
+               fmt.Fprintln(stdout, prog, cmd.Version.String())
+               return 0
        } else if !*list && flags.NArg() != 1 {
                fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
                return 2
@@ -1906,11 +1910,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                // not safe to run a gateway service without an auth
                // secret
                cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
-       } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
-               // dispatcher did not tell us which external IP
-               // address to advertise --> no gateway service
-               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
        } else {
+               gwListen := os.Getenv("GatewayAddress")
                cr.gateway = Gateway{
                        Address:       gwListen,
                        AuthSecret:    gwAuthSecret,
@@ -1918,6 +1919,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        Target:        cr.executor,
                        Log:           cr.CrunchLog,
                }
+               if gwListen == "" {
+                       // Direct connection won't work, so we use the
+                       // gateway_address field to indicate the
+                       // internalURL of the controller process that
+                       // has the current tunnel connection.
+                       cr.gateway.ArvadosClient = cr.dispatcherClient
+                       cr.gateway.UpdateTunnelURL = func(url string) {
+                               cr.gateway.Address = "tunnel " + url
+                               cr.DispatcherArvClient.Update("containers", containerUUID,
+                                       arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+                       }
+               }
                err = cr.gateway.Start()
                if err != nil {
                        log.Printf("error starting gateway server: %s", err)
index 9860c7949727b169ccc1df66e15e4f223dc7e7cd..d569020824c22373d5098e0afd4c14d6156dd773 100644 (file)
@@ -229,13 +229,25 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
                c.Check(s.logFiles["crunch-run.txt"], Not(Matches), `(?ms).* at http://169\.254\..*`)
                c.Check(s.logFiles["stderr.txt"], Matches, `(?ms).*ARVADOS_KEEP_SERVICES=http://[\d\.]{7,}:\d+\n.*`)
        }
+}
 
+func (s *integrationSuite) TestRunTrivialContainerWithNoLocalKeepstore(c *C) {
        // Check that (1) config is loaded from $ARVADOS_CONFIG when
        // not provided on stdin and (2) if a local keepstore is not
        // started, crunch-run.txt explains why not.
        s.SetUpTest(c)
        s.stdin.Reset()
        s.testRunTrivialContainer(c)
+       c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because KeepBuffers=0 in config\n.*`)
+
+       s.SetUpTest(c)
+       s.args = []string{"-config", c.MkDir() + "/config.yaml"}
+       s.stdin.Reset()
+       buf, err := ioutil.ReadFile(os.Getenv("ARVADOS_CONFIG"))
+       c.Assert(err, IsNil)
+       err = ioutil.WriteFile(s.args[1], bytes.Replace(buf, []byte("LocalKeepBlobBuffersPerVCPU: 0"), []byte("LocalKeepBlobBuffersPerVCPU: 1"), -1), 0666)
+       c.Assert(err, IsNil)
+       s.testRunTrivialContainer(c)
        c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-00000000000000\d\) uses AccessViaHosts\n.*`)
 
        // Check that config read errors are logged
@@ -248,7 +260,7 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
        s.SetUpTest(c)
        s.args = []string{"-config", c.MkDir() + "/config-unreadable.yaml"}
        s.stdin.Reset()
-       err := ioutil.WriteFile(s.args[1], []byte{}, 0)
+       err = ioutil.WriteFile(s.args[1], []byte{}, 0)
        c.Check(err, IsNil)
        s.testRunTrivialContainer(c)
        c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* permission denied\n.*`)
index 0d9324784d503e1fb30789e45e2f65ae7b84fdd1..e2348337e62992eb4463947690e809e1927bb232 100644 (file)
@@ -6,6 +6,8 @@ package lsf
 
 import (
        "context"
+       "crypto/hmac"
+       "crypto/sha256"
        "errors"
        "fmt"
        "math"
@@ -274,7 +276,12 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s
        var crArgs []string
        crArgs = append(crArgs, crunchRunCommand...)
        crArgs = append(crArgs, container.UUID)
-       crScript := execScript(crArgs)
+
+       h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken))
+       fmt.Fprint(h, container.UUID)
+       authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+       crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret})
 
        bsubArgs, err := disp.bsubArgs(container)
        if err != nil {
@@ -353,8 +360,14 @@ func (disp *dispatcher) checkLsfQueueForOrphans() {
        }
 }
 
-func execScript(args []string) []byte {
-       s := "#!/bin/sh\nexec"
+func execScript(args []string, env map[string]string) []byte {
+       s := "#!/bin/sh\n"
+       for k, v := range env {
+               s += k + `='`
+               s += strings.Replace(v, `'`, `'\''`, -1)
+               s += `' `
+       }
+       s += `exec`
        for _, w := range args {
                s += ` '`
                s += strings.Replace(w, `'`, `'\''`, -1)
index d76ece1eddb4560f47f9d7892313e1f9e3882746..3797a17f50d504ae2894ac4c6a68f598b4e37564 100644 (file)
@@ -10,6 +10,7 @@ import (
        "encoding/json"
        "io"
        "net"
+       "net/http"
 
        "github.com/sirupsen/logrus"
 )
@@ -47,7 +48,8 @@ var (
        EndpointContainerDelete               = APIEndpoint{"DELETE", "arvados/v1/containers/{uuid}", ""}
        EndpointContainerLock                 = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/lock", ""}
        EndpointContainerUnlock               = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/unlock", ""}
-       EndpointContainerSSH                  = APIEndpoint{"GET", "arvados/v1/connect/{uuid}/ssh", ""} // move to /containers after #17014 fixes routing
+       EndpointContainerSSH                  = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/ssh", ""}            // move to /containers after #17014 fixes routing
+       EndpointContainerGatewayTunnel        = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/gateway_tunnel", ""} // move to /containers after #17014 fixes routing
        EndpointContainerRequestCreate        = APIEndpoint{"POST", "arvados/v1/container_requests", "container_request"}
        EndpointContainerRequestUpdate        = APIEndpoint{"PATCH", "arvados/v1/container_requests/{uuid}", "container_request"}
        EndpointContainerRequestGet           = APIEndpoint{"GET", "arvados/v1/container_requests/{uuid}", ""}
@@ -96,12 +98,19 @@ type ContainerSSHOptions struct {
        UUID          string `json:"uuid"`
        DetachKeys    string `json:"detach_keys"`
        LoginUsername string `json:"login_username"`
+       NoForward     bool   `json:"no_forward"`
 }
 
-type ContainerSSHConnection struct {
+type ConnectionResponse struct {
        Conn   net.Conn           `json:"-"`
        Bufrw  *bufio.ReadWriter  `json:"-"`
        Logger logrus.FieldLogger `json:"-"`
+       Header http.Header        `json:"-"`
+}
+
+type ContainerGatewayTunnelOptions struct {
+       UUID       string `json:"uuid"`
+       AuthSecret string `json:"auth_secret"`
 }
 
 type GetOptions struct {
@@ -254,7 +263,8 @@ type API interface {
        ContainerDelete(ctx context.Context, options DeleteOptions) (Container, error)
        ContainerLock(ctx context.Context, options GetOptions) (Container, error)
        ContainerUnlock(ctx context.Context, options GetOptions) (Container, error)
-       ContainerSSH(ctx context.Context, options ContainerSSHOptions) (ContainerSSHConnection, error)
+       ContainerSSH(ctx context.Context, options ContainerSSHOptions) (ConnectionResponse, error)
+       ContainerGatewayTunnel(ctx context.Context, options ContainerGatewayTunnelOptions) (ConnectionResponse, error)
        ContainerRequestCreate(ctx context.Context, options CreateOptions) (ContainerRequest, error)
        ContainerRequestUpdate(ctx context.Context, options UpdateOptions) (ContainerRequest, error)
        ContainerRequestGet(ctx context.Context, options GetOptions) (ContainerRequest, error)
index 00c98d572ea010e2db43ca1c62bfa0eb341b1e7d..897ae434e14bd0a0392d041a125a598b2c1d8b34 100644 (file)
@@ -14,14 +14,17 @@ import (
        "github.com/sirupsen/logrus"
 )
 
-func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+       defer cresp.Conn.Close()
        hj, ok := w.(http.Hijacker)
        if !ok {
                http.Error(w, "ResponseWriter does not support connection upgrade", http.StatusInternalServerError)
                return
        }
        w.Header().Set("Connection", "upgrade")
-       w.Header().Set("Upgrade", "ssh")
+       for k, v := range cresp.Header {
+               w.Header()[k] = v
+       }
        w.WriteHeader(http.StatusSwitchingProtocols)
        conn, bufrw, err := hj.Hijack()
        if err != nil {
@@ -31,44 +34,46 @@ func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http
        defer conn.Close()
 
        var bytesIn, bytesOut int64
+       ctx, cancel := context.WithCancel(req.Context())
        var wg sync.WaitGroup
-       ctx, cancel := context.WithCancel(context.Background())
        wg.Add(1)
        go func() {
                defer wg.Done()
                defer cancel()
-               n, err := io.CopyN(conn, sshconn.Bufrw, int64(sshconn.Bufrw.Reader.Buffered()))
+               n, err := io.CopyN(conn, cresp.Bufrw, int64(cresp.Bufrw.Reader.Buffered()))
                bytesOut += n
                if err == nil {
-                       n, err = io.Copy(conn, sshconn.Conn)
+                       n, err = io.Copy(conn, cresp.Conn)
                        bytesOut += n
                }
                if err != nil {
-                       ctxlog.FromContext(req.Context()).WithError(err).Error("error copying downstream")
+                       ctxlog.FromContext(ctx).WithError(err).Error("error copying downstream")
                }
        }()
        wg.Add(1)
        go func() {
                defer wg.Done()
                defer cancel()
-               n, err := io.CopyN(sshconn.Conn, bufrw, int64(bufrw.Reader.Buffered()))
+               n, err := io.CopyN(cresp.Conn, bufrw, int64(bufrw.Reader.Buffered()))
                bytesIn += n
                if err == nil {
-                       n, err = io.Copy(sshconn.Conn, conn)
+                       n, err = io.Copy(cresp.Conn, conn)
                        bytesIn += n
                }
                if err != nil {
-                       ctxlog.FromContext(req.Context()).WithError(err).Error("error copying upstream")
+                       ctxlog.FromContext(ctx).WithError(err).Error("error copying upstream")
                }
        }()
        <-ctx.Done()
-       if sshconn.Logger != nil {
-               go func() {
-                       wg.Wait()
-                       sshconn.Logger.WithFields(logrus.Fields{
+       go func() {
+               // Wait for both io.Copy goroutines to finish and increment
+               // their byte counters.
+               wg.Wait()
+               if cresp.Logger != nil {
+                       cresp.Logger.WithFields(logrus.Fields{
                                "bytesIn":  bytesIn,
                                "bytesOut": bytesOut,
                        }).Info("closed connection")
-               }()
-       }
+               }
+       }()
 }
index f49d29ce2b8cbbc7b9a8c564e20d86673588f58a..d6da579d6b9ce1323dfbeb9b50f993232822379a 100644 (file)
@@ -109,9 +109,13 @@ func (as *APIStub) ContainerUnlock(ctx context.Context, options arvados.GetOptio
        as.appendCall(ctx, as.ContainerUnlock, options)
        return arvados.Container{}, as.Error
 }
-func (as *APIStub) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ContainerSSHConnection, error) {
+func (as *APIStub) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ConnectionResponse, error) {
        as.appendCall(ctx, as.ContainerSSH, options)
-       return arvados.ContainerSSHConnection{}, as.Error
+       return arvados.ConnectionResponse{}, as.Error
+}
+func (as *APIStub) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) {
+       as.appendCall(ctx, as.ContainerGatewayTunnel, options)
+       return arvados.ConnectionResponse{}, as.Error
 }
 func (as *APIStub) ContainerRequestCreate(ctx context.Context, options arvados.CreateOptions) (arvados.ContainerRequest, error) {
        as.appendCall(ctx, as.ContainerRequestCreate, options)
index 28cb0953f3c42a348a623a4f3f54aadc27d7958c..e32d385f73fc0849c30d4a730cb3b83c6a4425c6 100644 (file)
@@ -831,6 +831,7 @@ def setup_config():
                     "JobsAPI": {
                         "GitInternalDir": os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'internal.git'),
                     },
+                    "LocalKeepBlobBuffersPerVCPU": 0,
                     "SupportedDockerImageFormats": {"v1": {}},
                     "ShellAccess": {
                         "Admin": True,
index 08f87bbdb13b3a4ae21ce4d26b694ecc2dd57cef..43af0721c4b7b58faf0bdaf477e386502dad8c50 100644 (file)
@@ -498,9 +498,9 @@ class Container < ArvadosModel
       permitted.push :priority
 
     when Running
-      permitted.push :priority, :output_properties, *progress_attrs
+      permitted.push :priority, :output_properties, :gateway_address, *progress_attrs
       if self.state_changed?
-        permitted.push :started_at, :gateway_address
+        permitted.push :started_at
       end
       if !self.interactive_session_started_was
         permitted.push :interactive_session_started
index bcf99da2e3442ba088d9effbe80a633e6467f857..a4c0ce17926092ec451583404a21f56374f79176 100644 (file)
@@ -958,6 +958,7 @@ class ContainerTest < ActiveSupport::TestCase
         Thread.current[:user] = auth.user
       end
 
+      assert c.update_attributes(gateway_address: "127.0.0.1:9")
       assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
       assert c.update_attributes(runtime_status: {'warning' => 'something happened'})
       assert c.update_attributes(progress: 0.5)
index c31d7997522fa1caa73507a009d680b3835a2f46..c774584d683c338e70629320d9d602b9fea30814 100644 (file)
@@ -7,6 +7,8 @@ package dispatchslurm
 
 import (
        "context"
+       "crypto/hmac"
+       "crypto/sha256"
        "fmt"
        "log"
        "math"
@@ -213,7 +215,12 @@ func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []s
        crArgs := append([]string(nil), crunchRunCommand...)
        crArgs = append(crArgs, "--runtime-engine="+disp.cluster.Containers.RuntimeEngine)
        crArgs = append(crArgs, container.UUID)
-       crScript := strings.NewReader(execScript(crArgs))
+
+       h := hmac.New(sha256.New, []byte(disp.cluster.SystemRootToken))
+       fmt.Fprint(h, container.UUID)
+       authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+       crScript := strings.NewReader(execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret}))
 
        sbArgs, err := disp.sbatchArgs(container)
        if err != nil {
index fb16e593e5c5648720452fd49194910a4b2021b5..d0bfbc4a929dd8067a8e3e3e519b17dc0777f475 100644 (file)
@@ -8,8 +8,14 @@ import (
        "strings"
 )
 
-func execScript(args []string) string {
-       s := "#!/bin/sh\nexec"
+func execScript(args []string, env map[string]string) string {
+       s := "#!/bin/sh\n"
+       for k, v := range env {
+               s += k + `='`
+               s += strings.Replace(v, `'`, `'\''`, -1)
+               s += `' `
+       }
+       s += `exec`
        for _, w := range args {
                s += ` '`
                s += strings.Replace(w, `'`, `'\''`, -1)
index 00d70190dd043416302c38fc526dc551dc08f687..bba9a05755cb36ec9848d11b17bc0187657e87d3 100644 (file)
@@ -23,6 +23,7 @@ func (s *ScriptSuite) TestExecScript(c *C) {
                {[]string{`foo"`, "'waz 'qux\n"}, `exec 'foo"' ''\''waz '\''qux` + "\n" + `'`},
        } {
                c.Logf("%+v -> %+v", test.args, test.script)
-               c.Check(execScript(test.args), Equals, "#!/bin/sh\n"+test.script+"\n")
+               c.Check(execScript(test.args, nil), Equals, "#!/bin/sh\n"+test.script+"\n")
        }
+       c.Check(execScript([]string{"sh", "-c", "echo $foo"}, map[string]string{"foo": "b'ar"}), Equals, "#!/bin/sh\nfoo='b'\\''ar' exec 'sh' '-c' 'echo $foo'\n")
 }