19166: Set up tunnel for container gateway requests
authorTom Clegg <tom@curii.com>
Sat, 18 Jun 2022 04:34:03 +0000 (00:34 -0400)
committerTom Clegg <tom@curii.com>
Fri, 24 Jun 2022 18:23:25 +0000 (14:23 -0400)
in slurm/lsf environments, where controller doesn't know how to (and
perhaps can't) connect directly to the compute node where crunch-run
is running.

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

15 files changed:
go.mod
go.sum
lib/controller/federation/conn.go
lib/controller/localdb/conn.go
lib/controller/localdb/container_gateway.go
lib/controller/localdb/container_gateway_test.go
lib/controller/router/router.go
lib/controller/rpc/conn.go
lib/crunchrun/container_gateway.go
lib/crunchrun/crunchrun.go
sdk/go/arvados/api.go
sdk/go/arvados/container_gateway.go
sdk/go/arvadostest/api.go
services/api/app/models/container.rb
services/api/test/unit/container_test.rb

diff --git a/go.mod b/go.mod
index 525bae11ee49be2fd83fe7d717384eaf580550dd..aced60dbc4e0ee83281ba8e03a8e0511a02ab220 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -72,6 +72,7 @@ require (
        github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
        github.com/golang/protobuf v1.5.0 // indirect
        github.com/googleapis/gax-go/v2 v2.0.5 // indirect
+       github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
        github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
        github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
        github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect
diff --git a/go.sum b/go.sum
index 82a8d83d7e2be701f3ab3f9b41312883cb62f3ad..422a891e00ef26add10565538607630d2770863d 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -433,6 +433,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
 github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
 github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
 github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 h1:xixZ2bWeofWV68J+x6AzmKuVM/JWCQwkWm6GW/MUR6I=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
index d9f587852d149da46cd49ddbfc9dd46095fe180e..08d3ab1a6ec9f6f46ab4e9b494c81674d4644a17 100644 (file)
@@ -379,6 +379,10 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH
        return conn.chooseBackend(options.UUID).ContainerSSH(ctx, options)
 }
 
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) {
+       return conn.chooseBackend(options.UUID).ContainerGatewayTunnel(ctx, options)
+}
+
 func (conn *Conn) ContainerRequestList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerRequestList, error) {
        return conn.generated_ContainerRequestList(ctx, options)
 }
index 104cfe28f5e0cccfb9cf785955229b4f3b297fdf..a36822ad6b1f5df1f73ffbc3536d76a7215f1817 100644 (file)
@@ -11,6 +11,7 @@ import (
        "net/http"
        "os"
        "strings"
+       "sync"
        "time"
 
        "git.arvados.org/arvados.git/lib/controller/railsproxy"
@@ -18,6 +19,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "github.com/hashicorp/yamux"
        "github.com/sirupsen/logrus"
 )
 
@@ -31,6 +33,8 @@ type Conn struct {
        lastVocabularyRefreshCheck time.Time
        lastVocabularyError        error
        loginController
+       gwTunnels     map[string]*yamux.Session
+       gwTunnelsLock sync.Mutex
 }
 
 func NewConn(cluster *arvados.Cluster) *Conn {
index 3b40eccaff68c138e498b9ec497f5f704e249f64..fcfa599e4db81c5ae64a8f720a7afd30dcbc3ca3 100644 (file)
@@ -6,13 +6,16 @@ package localdb
 
 import (
        "bufio"
+       "bytes"
        "context"
        "crypto/hmac"
        "crypto/sha256"
+       "crypto/subtle"
        "crypto/tls"
        "crypto/x509"
        "errors"
        "fmt"
+       "net"
        "net/http"
        "net/url"
        "strings"
@@ -21,6 +24,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/auth"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "github.com/hashicorp/yamux"
 )
 
 // ContainerSSH returns a connection to the SSH server in the
@@ -61,19 +65,33 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
                }
        }
 
-       switch ctr.State {
-       case arvados.ContainerStateQueued, arvados.ContainerStateLocked:
+       conn.gwTunnelsLock.Lock()
+       tunnel := conn.gwTunnels[opts.UUID]
+       conn.gwTunnelsLock.Unlock()
+
+       if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked {
                err = httpserver.ErrorWithStatus(fmt.Errorf("container is not running yet (state is %q)", ctr.State), http.StatusServiceUnavailable)
                return
-       case arvados.ContainerStateRunning:
-               if ctr.GatewayAddress == "" {
-                       err = httpserver.ErrorWithStatus(errors.New("container is running but gateway is not available -- installation problem or feature not supported"), http.StatusServiceUnavailable)
-                       return
-               }
-       default:
+       } else if ctr.State != arvados.ContainerStateRunning {
                err = httpserver.ErrorWithStatus(fmt.Errorf("container has ended (state is %q)", ctr.State), http.StatusGone)
                return
        }
+
+       var rawconn net.Conn
+       if ctr.GatewayAddress != "" && !strings.HasPrefix(ctr.GatewayAddress, "127.0.0.1:") {
+               rawconn, err = net.Dial("tcp", ctr.GatewayAddress)
+       } else if tunnel != nil {
+               rawconn, err = tunnel.Open()
+       } else if ctr.GatewayAddress == "" {
+               err = errors.New("container is running but gateway is not available")
+       } else {
+               err = errors.New("container gateway is running but tunnel is down")
+       }
+       if err != nil {
+               err = httpserver.ErrorWithStatus(err, http.StatusServiceUnavailable)
+               return
+       }
+
        // crunch-run uses a self-signed / unverifiable TLS
        // certificate, so we use the following scheme to ensure we're
        // not talking to a MITM.
@@ -93,7 +111,7 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
        // X-Arvados-Authorization-Response header, proving that the
        // server knows ctrKey.
        var requestAuth, respondAuth string
-       netconn, err := tls.Dial("tcp", ctr.GatewayAddress, &tls.Config{
+       tlsconn := tls.Client(rawconn, &tls.Config{
                InsecureSkipVerify: true,
                VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
                        if len(rawCerts) == 0 {
@@ -111,23 +129,25 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
                        return nil
                },
        })
+       err = tlsconn.HandshakeContext(ctx)
        if err != nil {
                err = httpserver.ErrorWithStatus(err, http.StatusBadGateway)
                return
        }
        if respondAuth == "" {
+               tlsconn.Close()
                err = httpserver.ErrorWithStatus(errors.New("BUG: no respondAuth"), http.StatusInternalServerError)
                return
        }
-       bufr := bufio.NewReader(netconn)
-       bufw := bufio.NewWriter(netconn)
+       bufr := bufio.NewReader(tlsconn)
+       bufw := bufio.NewWriter(tlsconn)
 
        u := url.URL{
                Scheme: "http",
                Host:   ctr.GatewayAddress,
                Path:   "/ssh",
        }
-       bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
+       bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n")
        bufw.WriteString("Host: " + u.Host + "\r\n")
        bufw.WriteString("Upgrade: ssh\r\n")
        bufw.WriteString("X-Arvados-Target-Uuid: " + opts.UUID + "\r\n")
@@ -139,18 +159,18 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
        resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
        if err != nil {
                err = httpserver.ErrorWithStatus(fmt.Errorf("error reading http response from gateway: %w", err), http.StatusBadGateway)
-               netconn.Close()
+               tlsconn.Close()
                return
        }
        if resp.Header.Get("X-Arvados-Authorization-Response") != respondAuth {
                err = httpserver.ErrorWithStatus(errors.New("bad X-Arvados-Authorization-Response header"), http.StatusBadGateway)
-               netconn.Close()
+               tlsconn.Close()
                return
        }
        if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
                strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
                err = httpserver.ErrorWithStatus(errors.New("bad upgrade"), http.StatusBadGateway)
-               netconn.Close()
+               tlsconn.Close()
                return
        }
 
@@ -162,13 +182,60 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt
                        },
                })
                if err != nil {
-                       netconn.Close()
+                       tlsconn.Close()
                        return
                }
        }
 
-       sshconn.Conn = netconn
+       sshconn.Conn = tlsconn
        sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
        sshconn.Logger = ctxlog.FromContext(ctx)
+       sshconn.UpgradeHeader = "ssh"
+       return
+}
+
+// ContainerGatewayTunnel sets up a tunnel enabling us (controller) to
+// connect to the caller's (crunch-run's) gateway server.
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, opts arvados.ContainerGatewayTunnelOptions) (resp arvados.ConnectionResponse, err error) {
+       h := hmac.New(sha256.New, []byte(conn.cluster.SystemRootToken))
+       fmt.Fprint(h, opts.UUID)
+       authSecret := fmt.Sprintf("%x", h.Sum(nil))
+       if subtle.ConstantTimeCompare([]byte(authSecret), []byte(opts.AuthSecret)) != 1 {
+               ctxlog.FromContext(ctx).Info("received incorrect auth_secret")
+               return resp, httpserver.ErrorWithStatus(errors.New("authentication error"), http.StatusUnauthorized)
+       }
+
+       muxconn, clientconn := net.Pipe()
+       tunnel, err := yamux.Server(muxconn, nil)
+       if err != nil {
+               clientconn.Close()
+               return resp, httpserver.ErrorWithStatus(err, http.StatusInternalServerError)
+       }
+
+       conn.gwTunnelsLock.Lock()
+       if conn.gwTunnels == nil {
+               conn.gwTunnels = map[string]*yamux.Session{opts.UUID: tunnel}
+       } else {
+               conn.gwTunnels[opts.UUID] = tunnel
+       }
+       conn.gwTunnelsLock.Unlock()
+
+       go func() {
+               <-tunnel.CloseChan()
+               conn.gwTunnelsLock.Lock()
+               if conn.gwTunnels[opts.UUID] == tunnel {
+                       delete(conn.gwTunnels, opts.UUID)
+               }
+               conn.gwTunnelsLock.Unlock()
+       }()
+
+       // Assuming we're acting as the backend of an http server,
+       // lib/controller/router will call resp's ServeHTTP handler,
+       // which upgrades the incoming http connection to a raw socket
+       // and connects it to our yamux.Server through our net.Pipe().
+       resp.Conn = clientconn
+       resp.Bufrw = &bufio.ReadWriter{Reader: bufio.NewReader(&bytes.Buffer{}), Writer: bufio.NewWriter(&bytes.Buffer{})}
+       resp.Logger = ctxlog.FromContext(ctx)
+       resp.UpgradeHeader = "tunnel"
        return
 }
index 271760420153481daac1f0f129a63c684591b94b..b3b604e53419f82b6bb475d50b5000c3954f9422 100644 (file)
@@ -12,9 +12,11 @@ import (
        "io"
        "io/ioutil"
        "net"
+       "net/http/httptest"
        "time"
 
        "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/lib/controller/router"
        "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
@@ -55,12 +57,21 @@ func (s *ContainerGatewaySuite) SetUpSuite(c *check.C) {
        fmt.Fprint(h, s.ctrUUID)
        authKey := fmt.Sprintf("%x", h.Sum(nil))
 
+       rtr := router.New(s.localdb, router.Config{})
+       srv := httptest.NewUnstartedServer(rtr)
+       srv.StartTLS()
+       ac := &arvados.Client{
+               APIHost:   srv.Listener.Addr().String(),
+               AuthToken: arvadostest.Dispatch1Token,
+               Insecure:  true,
+       }
        s.gw = &crunchrun.Gateway{
                ContainerUUID: s.ctrUUID,
                AuthSecret:    authKey,
                Address:       "localhost:0",
                Log:           ctxlog.TestLogger(c),
                Target:        crunchrun.GatewayTargetStub{},
+               ArvadosClient: ac,
        }
        c.Assert(s.gw.Start(), check.IsNil)
        rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
@@ -69,18 +80,25 @@ func (s *ContainerGatewaySuite) SetUpSuite(c *check.C) {
                Attrs: map[string]interface{}{
                        "state": arvados.ContainerStateLocked}})
        c.Assert(err, check.IsNil)
-       _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+}
+
+func (s *ContainerGatewaySuite) SetUpTest(c *check.C) {
+       // clear any tunnel sessions started by previous test cases
+       s.localdb.gwTunnelsLock.Lock()
+       s.localdb.gwTunnels = nil
+       s.localdb.gwTunnelsLock.Unlock()
+
+       rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
+       _, err := s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
                UUID: s.ctrUUID,
                Attrs: map[string]interface{}{
                        "state":           arvados.ContainerStateRunning,
                        "gateway_address": s.gw.Address}})
        c.Assert(err, check.IsNil)
-}
 
-func (s *ContainerGatewaySuite) SetUpTest(c *check.C) {
        s.cluster.Containers.ShellAccess.Admin = true
        s.cluster.Containers.ShellAccess.User = true
-       _, err := arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID)
+       _, err = arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID)
        c.Check(err, check.IsNil)
 }
 
@@ -234,3 +252,99 @@ func (s *ContainerGatewaySuite) TestConnectFail(c *check.C) {
        _, err = s.localdb.ContainerSSH(ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID})
        c.Check(err, check.ErrorMatches, `.* 404 .*`)
 }
+
+func (s *ContainerGatewaySuite) TestCreateTunnel(c *check.C) {
+       // no AuthSecret
+       conn, err := s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID: s.ctrUUID,
+       })
+       c.Check(err, check.ErrorMatches, `authentication error`)
+       c.Check(conn.Conn, check.IsNil)
+
+       // bogus AuthSecret
+       conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID:       s.ctrUUID,
+               AuthSecret: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
+       })
+       c.Check(err, check.ErrorMatches, `authentication error`)
+       c.Check(conn.Conn, check.IsNil)
+
+       // good AuthSecret
+       conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID:       s.ctrUUID,
+               AuthSecret: s.gw.AuthSecret,
+       })
+       c.Check(err, check.IsNil)
+       c.Check(conn.Conn, check.NotNil)
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnel(c *check.C) {
+       tungw := &crunchrun.Gateway{
+               ContainerUUID: s.ctrUUID,
+               AuthSecret:    s.gw.AuthSecret,
+               Log:           ctxlog.TestLogger(c),
+               Target:        crunchrun.GatewayTargetStub{},
+               ArvadosClient: s.gw.ArvadosClient,
+       }
+       c.Assert(tungw.Start(), check.IsNil)
+
+       // We didn't supply an external hostname in the Address field,
+       // so Start() should assign a local address.
+       host, _, err := net.SplitHostPort(tungw.Address)
+       c.Assert(err, check.IsNil)
+       c.Check(host, check.Equals, "127.0.0.1")
+
+       // Set the gateway_address field to 127.0.0.1:badport to
+       // ensure the ContainerSSH() handler connects through the
+       // tunnel, rather than the gateway server on 127.0.0.1 (which
+       // wouldn't work IRL where controller and gateway are on
+       // different hosts, but would allow the test to cheat).
+       rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
+       _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+               UUID: s.ctrUUID,
+               Attrs: map[string]interface{}{
+                       "state":           arvados.ContainerStateRunning,
+                       "gateway_address": "127.0.0.1:0"}})
+       c.Assert(err, check.IsNil)
+
+       ctr, err := s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID})
+       c.Check(err, check.IsNil)
+       c.Check(ctr.InteractiveSessionStarted, check.Equals, false)
+       c.Check(ctr.GatewayAddress, check.Equals, "127.0.0.1:0")
+
+       c.Log("connecting to gateway through tunnel")
+       sshconn, err := s.localdb.ContainerSSH(s.ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID})
+       c.Assert(err, check.IsNil)
+       c.Assert(sshconn.Conn, check.NotNil)
+       defer sshconn.Conn.Close()
+
+       done := make(chan struct{})
+       go func() {
+               defer close(done)
+
+               // Receive text banner
+               buf := make([]byte, 12)
+               _, err := io.ReadFull(sshconn.Conn, buf)
+               c.Check(err, check.IsNil)
+               c.Check(string(buf), check.Equals, "SSH-2.0-Go\r\n")
+
+               // Send text banner
+               _, err = sshconn.Conn.Write([]byte("SSH-2.0-Fake\r\n"))
+               c.Check(err, check.IsNil)
+
+               // Receive binary
+               _, err = io.ReadFull(sshconn.Conn, buf[:4])
+               c.Check(err, check.IsNil)
+
+               // If we can get this far into an SSH handshake...
+               c.Logf("was able to read %x -- success, tunnel is working", buf[:4])
+       }()
+       select {
+       case <-done:
+       case <-time.After(time.Second):
+               c.Fail()
+       }
+       ctr, err = s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID})
+       c.Check(err, check.IsNil)
+       c.Check(ctr.InteractiveSessionStarted, check.Equals, true)
+}
index 586ea8e676ec9ae2a08a7f1855f7f17d8ef5754a..a87dbca92664254df43b12b73a2fd15917f137fe 100644 (file)
@@ -244,6 +244,13 @@ func (rtr *router) addRoutes() {
                                return rtr.backend.ContainerSSH(ctx, *opts.(*arvados.ContainerSSHOptions))
                        },
                },
+               {
+                       arvados.EndpointContainerGatewayTunnel,
+                       func() interface{} { return &arvados.ContainerGatewayTunnelOptions{} },
+                       func(ctx context.Context, opts interface{}) (interface{}, error) {
+                               return rtr.backend.ContainerGatewayTunnel(ctx, *opts.(*arvados.ContainerGatewayTunnelOptions))
+                       },
+               },
                {
                        arvados.EndpointGroupCreate,
                        func() interface{} { return &arvados.CreateOptions{} },
index 1148068d70896c2ff4b16074c731fedf23bb5bbb..8e25ca0d05b741e963addc987359fe8575404388 100644 (file)
@@ -332,6 +332,39 @@ func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOption
 // a running container. If the returned error is nil, the caller is
 // responsible for closing sshconn.Conn.
 func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) {
+       u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
+       if err != nil {
+               err = fmt.Errorf("url.Parse: %w", err)
+               return
+       }
+       u.RawQuery = url.Values{
+               "detach_keys":    {options.DetachKeys},
+               "login_username": {options.LoginUsername},
+       }.Encode()
+       resp, err := conn.socket(ctx, u, "ssh", nil)
+       if err != nil {
+               return
+       }
+       return arvados.ContainerSSHConnection(resp), nil
+}
+
+// ContainerGatewayTunnel returns a connection to a yamux session on
+// the controller. The caller should connect the returned resp.Conn to
+// a client-side yamux session.
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (tunnelconn arvados.ConnectionResponse, err error) {
+       u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", options.UUID, -1))
+       if err != nil {
+               err = fmt.Errorf("url.Parse: %w", err)
+               return
+       }
+       return conn.socket(ctx, u, "tunnel", url.Values{
+               "auth_secret": {options.AuthSecret},
+       })
+}
+
+// socket sets up a socket using the specified API endpoint and
+// upgrade header.
+func (conn *Conn) socket(ctx context.Context, u *url.URL, upgradeHeader string, postform url.Values) (connresp arvados.ConnectionResponse, err error) {
        addr := conn.baseURL.Host
        if strings.Index(addr, ":") < 1 || (strings.Contains(addr, "::") && addr[0] != '[') {
                // hostname or ::1 or 1::1
@@ -354,15 +387,6 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH
        bufr := bufio.NewReader(netconn)
        bufw := bufio.NewWriter(netconn)
 
-       u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
-       if err != nil {
-               err = fmt.Errorf("tls.Dial: %w", err)
-               return
-       }
-       u.RawQuery = url.Values{
-               "detach_keys":    {options.DetachKeys},
-               "login_username": {options.LoginUsername},
-       }.Encode()
        tokens, err := conn.tokenProvider(ctx)
        if err != nil {
                return
@@ -370,11 +394,17 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH
                err = httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
                return
        }
-       bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
+       postdata := postform.Encode()
+       bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n")
        bufw.WriteString("Authorization: Bearer " + tokens[0] + "\r\n")
        bufw.WriteString("Host: " + u.Host + "\r\n")
-       bufw.WriteString("Upgrade: ssh\r\n")
+       bufw.WriteString("Upgrade: " + upgradeHeader + "\r\n")
+       bufw.WriteString("Content-Type: application/x-www-form-urlencoded\r\n")
+       fmt.Fprintf(bufw, "Content-Length: %d\r\n", len(postdata))
        bufw.WriteString("\r\n")
+       if len(postdata) > 0 {
+               bufw.WriteString(postdata)
+       }
        bufw.Flush()
        resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
        if err != nil {
@@ -394,13 +424,13 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH
                err = fmt.Errorf("server did not provide a tunnel: %s (HTTP %d)", message, resp.StatusCode)
                return
        }
-       if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
+       if strings.ToLower(resp.Header.Get("Upgrade")) != upgradeHeader ||
                strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
                err = fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection"))
                return
        }
-       sshconn.Conn = netconn
-       sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
+       connresp.Conn = netconn
+       connresp.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
        return
 }
 
index 01457015e16f1870bf4adf4785b8f9c08cec10d5..49eb68c0f50ba263667ba49429dee83a91bd6c5b 100644 (file)
@@ -14,16 +14,22 @@ import (
        "io"
        "net"
        "net/http"
+       "net/url"
        "os"
        "os/exec"
        "sync"
        "syscall"
+       "time"
 
+       "git.arvados.org/arvados.git/lib/controller/rpc"
        "git.arvados.org/arvados.git/lib/selfsigned"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/auth"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "github.com/creack/pty"
        "github.com/google/shlex"
+       "github.com/hashicorp/yamux"
        "golang.org/x/crypto/ssh"
        "golang.org/x/net/context"
 )
@@ -45,12 +51,27 @@ func (GatewayTargetStub) InjectCommand(ctx context.Context, detachKeys, username
 
 type Gateway struct {
        ContainerUUID string
-       Address       string // listen host:port; if port=0, Start() will change it to the selected port
-       AuthSecret    string
-       Target        GatewayTarget
-       Log           interface {
+       // Caller should set Address to "", or "host:0" or "host:port"
+       // where host is a known external IP address; port is a
+       // desired port number to listen on; and ":0" chooses an
+       // available dynamic port.
+       //
+       // If Address is "", Start() listens only on the loopback
+       // interface (and changes Address to "127.0.0.1:port").
+       // Otherwise it listens on all interfaces.
+       //
+       // If Address is "host:0", Start() updates Address to
+       // "host:port".
+       Address    string
+       AuthSecret string
+       Target     GatewayTarget
+       Log        interface {
                Printf(fmt string, args ...interface{})
        }
+       // If non-nil, set up a ContainerGatewayTunnel, so that the
+       // controller can connect to us even if our external IP
+       // address is unknown or not routable from controller.
+       ArvadosClient *arvados.Client
 
        sshConfig   ssh.ServerConfig
        requestAuth string
@@ -99,7 +120,22 @@ func (gw *Gateway) Start() error {
        // from arvados-controller, and PORT is either the desired
        // port where we should run our gateway server, or "0" if we
        // should choose an available port.
-       host, port, err := net.SplitHostPort(gw.Address)
+       extAddr := gw.Address
+       // Generally we can't know which local interface corresponds
+       // to an externally reachable IP address, so if we expect to
+       // be reachable by external hosts, we listen on all
+       // interfaces.
+       listenHost := ""
+       if extAddr == "" {
+               // If the dispatcher doesn't tell us our external IP
+               // address, controller will only be able to connect
+               // through the tunnel (see runTunnel), so our gateway
+               // server only needs to listen on the loopback
+               // interface.
+               extAddr = "127.0.0.1:0"
+               listenHost = "127.0.0.1"
+       }
+       extHost, extPort, err := net.SplitHostPort(extAddr)
        if err != nil {
                return err
        }
@@ -121,26 +157,87 @@ func (gw *Gateway) Start() error {
                                Certificates: []tls.Certificate{cert},
                        },
                },
-               Addr: ":" + port,
+               Addr: net.JoinHostPort(listenHost, extPort),
        }
        err = srv.Start()
        if err != nil {
                return err
        }
-       // Get the port number we are listening on (the port might be
+       // Get the port number we are listening on (extPort might be
        // "0" or a port name, in which case this will be different).
-       _, port, err = net.SplitHostPort(srv.Addr)
+       _, listenPort, err := net.SplitHostPort(srv.Addr)
        if err != nil {
                return err
        }
-       // When changing state to Running, we will set
-       // gateway_address to "HOST:PORT" where HOST is our
-       // external hostname/IP as provided by arvados-dispatch-cloud,
-       // and PORT is the port number we ended up listening on.
-       gw.Address = net.JoinHostPort(host, port)
+       // When changing state to Running, the caller will want to set
+       // gateway_address to a "HOST:PORT" that, if controller
+       // connects to it, will reach this gateway server.
+       //
+       // The most likely thing to work is: HOST is our external
+       // hostname/IP as provided by the caller
+       // (arvados-dispatch-cloud) or 127.0.0.1 to indicate
+       // non-tunnel connections aren't available; and PORT is the
+       // port number we are listening on.
+       gw.Address = net.JoinHostPort(extHost, listenPort)
+       if gw.ArvadosClient != nil {
+               go gw.maintainTunnel(gw.Address)
+       }
        return nil
 }
 
+func (gw *Gateway) maintainTunnel(addr string) {
+       for ; ; time.Sleep(5 * time.Second) {
+               err := gw.runTunnel(addr)
+               gw.Log.Printf("runTunnel: %s", err)
+       }
+}
+
+// runTunnel connects to controller and sets up a tunnel through
+// which controller can connect to the gateway server at the given
+// addr.
+func (gw *Gateway) runTunnel(addr string) error {
+       ctx := auth.NewContext(context.Background(), auth.NewCredentials(gw.ArvadosClient.AuthToken))
+       arpc := rpc.NewConn("", &url.URL{Scheme: "https", Host: gw.ArvadosClient.APIHost}, gw.ArvadosClient.Insecure, rpc.PassthroughTokenProvider)
+       tun, err := arpc.ContainerGatewayTunnel(ctx, arvados.ContainerGatewayTunnelOptions{
+               UUID:       gw.ContainerUUID,
+               AuthSecret: gw.AuthSecret,
+       })
+       if err != nil {
+               return fmt.Errorf("error creating gateway tunnel: %s", err)
+       }
+       mux, err := yamux.Client(tun.Conn, nil)
+       if err != nil {
+               return fmt.Errorf("error setting up mux client end: %s", err)
+       }
+       for {
+               muxconn, err := mux.Accept()
+               if err != nil {
+                       return err
+               }
+               gw.Log.Printf("receiving connection from tunnel, remoteAddr %s", muxconn.RemoteAddr().String())
+               go func() {
+                       defer muxconn.Close()
+                       gwconn, err := net.Dial("tcp", addr)
+                       if err != nil {
+                               gw.Log.Printf("error connecting to %s on behalf of tunnel connection: %s", addr, err)
+                               return
+                       }
+                       defer gwconn.Close()
+                       var wg sync.WaitGroup
+                       wg.Add(2)
+                       go func() {
+                               defer wg.Done()
+                               io.Copy(gwconn, muxconn)
+                       }()
+                       go func() {
+                               defer wg.Done()
+                               io.Copy(muxconn, gwconn)
+                       }()
+                       wg.Wait()
+               }()
+       }
+}
+
 // handleSSH connects to an SSH server that allows the caller to run
 // interactive commands as root (or any other desired user) inside the
 // container. The tunnel itself can only be created by an
@@ -166,7 +263,7 @@ func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) {
        // In future we'll handle browser traffic too, but for now the
        // only traffic we expect is an SSH tunnel from
        // (*lib/controller/localdb.Conn)ContainerSSH()
-       if req.Method != "GET" || req.Header.Get("Upgrade") != "ssh" {
+       if req.Method != "POST" || req.Header.Get("Upgrade") != "ssh" {
                http.Error(w, "path not found", http.StatusNotFound)
                return
        }
@@ -204,7 +301,9 @@ func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) {
        ctx := req.Context()
 
        conn, newchans, reqs, err := ssh.NewServerConn(netconn, &gw.sshConfig)
-       if err != nil {
+       if err == io.EOF {
+               return
+       } else if err != nil {
                gw.Log.Printf("ssh.NewServerConn: %s", err)
                return
        }
index 0e86f604a7733b1a12296c0f389386d5d352526b..c2ed37e75a43a72d6f71d0b0083e65720ad9ee0b 100644 (file)
@@ -1916,6 +1916,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        ContainerUUID: containerUUID,
                        Target:        cr.executor,
                        Log:           cr.CrunchLog,
+                       ArvadosClient: cr.dispatcherClient,
                }
                err = cr.gateway.Start()
                if err != nil {
index d76ece1eddb4560f47f9d7892313e1f9e3882746..1b303ffb414c893d0f6ada0622e12af3d34f75bb 100644 (file)
@@ -47,7 +47,8 @@ var (
        EndpointContainerDelete               = APIEndpoint{"DELETE", "arvados/v1/containers/{uuid}", ""}
        EndpointContainerLock                 = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/lock", ""}
        EndpointContainerUnlock               = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/unlock", ""}
-       EndpointContainerSSH                  = APIEndpoint{"GET", "arvados/v1/connect/{uuid}/ssh", ""} // move to /containers after #17014 fixes routing
+       EndpointContainerSSH                  = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/ssh", ""}            // move to /containers after #17014 fixes routing
+       EndpointContainerGatewayTunnel        = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/gateway_tunnel", ""} // move to /containers after #17014 fixes routing
        EndpointContainerRequestCreate        = APIEndpoint{"POST", "arvados/v1/container_requests", "container_request"}
        EndpointContainerRequestUpdate        = APIEndpoint{"PATCH", "arvados/v1/container_requests/{uuid}", "container_request"}
        EndpointContainerRequestGet           = APIEndpoint{"GET", "arvados/v1/container_requests/{uuid}", ""}
@@ -98,10 +99,18 @@ type ContainerSSHOptions struct {
        LoginUsername string `json:"login_username"`
 }
 
-type ContainerSSHConnection struct {
-       Conn   net.Conn           `json:"-"`
-       Bufrw  *bufio.ReadWriter  `json:"-"`
-       Logger logrus.FieldLogger `json:"-"`
+type ContainerSSHConnection ConnectionResponse
+
+type ConnectionResponse struct {
+       Conn          net.Conn           `json:"-"`
+       Bufrw         *bufio.ReadWriter  `json:"-"`
+       Logger        logrus.FieldLogger `json:"-"`
+       UpgradeHeader string             `json:"-"`
+}
+
+type ContainerGatewayTunnelOptions struct {
+       UUID       string `json:"uuid"`
+       AuthSecret string `json:"auth_secret"`
 }
 
 type GetOptions struct {
@@ -255,6 +264,7 @@ type API interface {
        ContainerLock(ctx context.Context, options GetOptions) (Container, error)
        ContainerUnlock(ctx context.Context, options GetOptions) (Container, error)
        ContainerSSH(ctx context.Context, options ContainerSSHOptions) (ContainerSSHConnection, error)
+       ContainerGatewayTunnel(ctx context.Context, options ContainerGatewayTunnelOptions) (ConnectionResponse, error)
        ContainerRequestCreate(ctx context.Context, options CreateOptions) (ContainerRequest, error)
        ContainerRequestUpdate(ctx context.Context, options UpdateOptions) (ContainerRequest, error)
        ContainerRequestGet(ctx context.Context, options GetOptions) (ContainerRequest, error)
index 00c98d572ea010e2db43ca1c62bfa0eb341b1e7d..d1d512856ac9744f091c2152985a7ebc5fd81651 100644 (file)
@@ -14,14 +14,14 @@ import (
        "github.com/sirupsen/logrus"
 )
 
-func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        hj, ok := w.(http.Hijacker)
        if !ok {
                http.Error(w, "ResponseWriter does not support connection upgrade", http.StatusInternalServerError)
                return
        }
        w.Header().Set("Connection", "upgrade")
-       w.Header().Set("Upgrade", "ssh")
+       w.Header().Set("Upgrade", cresp.UpgradeHeader)
        w.WriteHeader(http.StatusSwitchingProtocols)
        conn, bufrw, err := hj.Hijack()
        if err != nil {
@@ -37,10 +37,10 @@ func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http
        go func() {
                defer wg.Done()
                defer cancel()
-               n, err := io.CopyN(conn, sshconn.Bufrw, int64(sshconn.Bufrw.Reader.Buffered()))
+               n, err := io.CopyN(conn, cresp.Bufrw, int64(cresp.Bufrw.Reader.Buffered()))
                bytesOut += n
                if err == nil {
-                       n, err = io.Copy(conn, sshconn.Conn)
+                       n, err = io.Copy(conn, cresp.Conn)
                        bytesOut += n
                }
                if err != nil {
@@ -51,10 +51,10 @@ func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http
        go func() {
                defer wg.Done()
                defer cancel()
-               n, err := io.CopyN(sshconn.Conn, bufrw, int64(bufrw.Reader.Buffered()))
+               n, err := io.CopyN(cresp.Conn, bufrw, int64(bufrw.Reader.Buffered()))
                bytesIn += n
                if err == nil {
-                       n, err = io.Copy(sshconn.Conn, conn)
+                       n, err = io.Copy(cresp.Conn, conn)
                        bytesIn += n
                }
                if err != nil {
@@ -62,10 +62,10 @@ func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http
                }
        }()
        <-ctx.Done()
-       if sshconn.Logger != nil {
+       if cresp.Logger != nil {
                go func() {
                        wg.Wait()
-                       sshconn.Logger.WithFields(logrus.Fields{
+                       cresp.Logger.WithFields(logrus.Fields{
                                "bytesIn":  bytesIn,
                                "bytesOut": bytesOut,
                        }).Info("closed connection")
index f49d29ce2b8cbbc7b9a8c564e20d86673588f58a..d784abf6719c26c855da0d24017444e57d962328 100644 (file)
@@ -113,6 +113,10 @@ func (as *APIStub) ContainerSSH(ctx context.Context, options arvados.ContainerSS
        as.appendCall(ctx, as.ContainerSSH, options)
        return arvados.ContainerSSHConnection{}, as.Error
 }
+func (as *APIStub) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) {
+       as.appendCall(ctx, as.ContainerGatewayTunnel, options)
+       return arvados.ConnectionResponse{}, as.Error
+}
 func (as *APIStub) ContainerRequestCreate(ctx context.Context, options arvados.CreateOptions) (arvados.ContainerRequest, error) {
        as.appendCall(ctx, as.ContainerRequestCreate, options)
        return arvados.ContainerRequest{}, as.Error
index 08f87bbdb13b3a4ae21ce4d26b694ecc2dd57cef..43af0721c4b7b58faf0bdaf477e386502dad8c50 100644 (file)
@@ -498,9 +498,9 @@ class Container < ArvadosModel
       permitted.push :priority
 
     when Running
-      permitted.push :priority, :output_properties, *progress_attrs
+      permitted.push :priority, :output_properties, :gateway_address, *progress_attrs
       if self.state_changed?
-        permitted.push :started_at, :gateway_address
+        permitted.push :started_at
       end
       if !self.interactive_session_started_was
         permitted.push :interactive_session_started
index bcf99da2e3442ba088d9effbe80a633e6467f857..a4c0ce17926092ec451583404a21f56374f79176 100644 (file)
@@ -958,6 +958,7 @@ class ContainerTest < ActiveSupport::TestCase
         Thread.current[:user] = auth.user
       end
 
+      assert c.update_attributes(gateway_address: "127.0.0.1:9")
       assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
       assert c.update_attributes(runtime_status: {'warning' => 'something happened'})
       assert c.update_attributes(progress: 0.5)