From: Tom Clegg Date: Sat, 18 Jun 2022 04:34:03 +0000 (-0400) Subject: 19166: Set up tunnel for container gateway requests X-Git-Tag: 2.5.0~106^2~18 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/bdc29d3129f6d75aa9ce0a24ffb849a272b06f08 19166: Set up tunnel for container gateway requests in slurm/lsf environments, where controller doesn't know how to (and perhaps can't) connect directly to the compute node where crunch-run is running. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/go.mod b/go.mod index 525bae11ee..aced60dbc4 100644 --- a/go.mod +++ b/go.mod @@ -72,6 +72,7 @@ require ( github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.5.0 // indirect github.com/googleapis/gax-go/v2 v2.0.5 // indirect + github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect diff --git a/go.sum b/go.sum index 82a8d83d7e..422a891e00 100644 --- a/go.sum +++ b/go.sum @@ -433,6 +433,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 h1:xixZ2bWeofWV68J+x6AzmKuVM/JWCQwkWm6GW/MUR6I= +github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= diff --git a/lib/controller/federation/conn.go b/lib/controller/federation/conn.go index d9f587852d..08d3ab1a6e 100644 --- a/lib/controller/federation/conn.go +++ b/lib/controller/federation/conn.go @@ -379,6 +379,10 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH return conn.chooseBackend(options.UUID).ContainerSSH(ctx, options) } +func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) { + return conn.chooseBackend(options.UUID).ContainerGatewayTunnel(ctx, options) +} + func (conn *Conn) ContainerRequestList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerRequestList, error) { return conn.generated_ContainerRequestList(ctx, options) } diff --git a/lib/controller/localdb/conn.go b/lib/controller/localdb/conn.go index 104cfe28f5..a36822ad6b 100644 --- a/lib/controller/localdb/conn.go +++ b/lib/controller/localdb/conn.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "strings" + "sync" "time" "git.arvados.org/arvados.git/lib/controller/railsproxy" @@ -18,6 +19,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/httpserver" + "github.com/hashicorp/yamux" "github.com/sirupsen/logrus" ) @@ -31,6 +33,8 @@ type Conn struct { lastVocabularyRefreshCheck time.Time lastVocabularyError error loginController + gwTunnels map[string]*yamux.Session + gwTunnelsLock sync.Mutex } func NewConn(cluster *arvados.Cluster) *Conn { diff --git a/lib/controller/localdb/container_gateway.go b/lib/controller/localdb/container_gateway.go index 3b40eccaff..fcfa599e4d 100644 --- a/lib/controller/localdb/container_gateway.go +++ b/lib/controller/localdb/container_gateway.go @@ -6,13 +6,16 @@ package localdb import ( "bufio" + "bytes" "context" "crypto/hmac" "crypto/sha256" + "crypto/subtle" "crypto/tls" "crypto/x509" "errors" "fmt" + "net" "net/http" "net/url" "strings" @@ -21,6 +24,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/auth" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/httpserver" + "github.com/hashicorp/yamux" ) // ContainerSSH returns a connection to the SSH server in the @@ -61,19 +65,33 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt } } - switch ctr.State { - case arvados.ContainerStateQueued, arvados.ContainerStateLocked: + conn.gwTunnelsLock.Lock() + tunnel := conn.gwTunnels[opts.UUID] + conn.gwTunnelsLock.Unlock() + + if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked { err = httpserver.ErrorWithStatus(fmt.Errorf("container is not running yet (state is %q)", ctr.State), http.StatusServiceUnavailable) return - case arvados.ContainerStateRunning: - if ctr.GatewayAddress == "" { - err = httpserver.ErrorWithStatus(errors.New("container is running but gateway is not available -- installation problem or feature not supported"), http.StatusServiceUnavailable) - return - } - default: + } else if ctr.State != arvados.ContainerStateRunning { err = httpserver.ErrorWithStatus(fmt.Errorf("container has ended (state is %q)", ctr.State), http.StatusGone) return } + + var rawconn net.Conn + if ctr.GatewayAddress != "" && !strings.HasPrefix(ctr.GatewayAddress, "127.0.0.1:") { + rawconn, err = net.Dial("tcp", ctr.GatewayAddress) + } else if tunnel != nil { + rawconn, err = tunnel.Open() + } else if ctr.GatewayAddress == "" { + err = errors.New("container is running but gateway is not available") + } else { + err = errors.New("container gateway is running but tunnel is down") + } + if err != nil { + err = httpserver.ErrorWithStatus(err, http.StatusServiceUnavailable) + return + } + // crunch-run uses a self-signed / unverifiable TLS // certificate, so we use the following scheme to ensure we're // not talking to a MITM. @@ -93,7 +111,7 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt // X-Arvados-Authorization-Response header, proving that the // server knows ctrKey. var requestAuth, respondAuth string - netconn, err := tls.Dial("tcp", ctr.GatewayAddress, &tls.Config{ + tlsconn := tls.Client(rawconn, &tls.Config{ InsecureSkipVerify: true, VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error { if len(rawCerts) == 0 { @@ -111,23 +129,25 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt return nil }, }) + err = tlsconn.HandshakeContext(ctx) if err != nil { err = httpserver.ErrorWithStatus(err, http.StatusBadGateway) return } if respondAuth == "" { + tlsconn.Close() err = httpserver.ErrorWithStatus(errors.New("BUG: no respondAuth"), http.StatusInternalServerError) return } - bufr := bufio.NewReader(netconn) - bufw := bufio.NewWriter(netconn) + bufr := bufio.NewReader(tlsconn) + bufw := bufio.NewWriter(tlsconn) u := url.URL{ Scheme: "http", Host: ctr.GatewayAddress, Path: "/ssh", } - bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n") + bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n") bufw.WriteString("Host: " + u.Host + "\r\n") bufw.WriteString("Upgrade: ssh\r\n") bufw.WriteString("X-Arvados-Target-Uuid: " + opts.UUID + "\r\n") @@ -139,18 +159,18 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"}) if err != nil { err = httpserver.ErrorWithStatus(fmt.Errorf("error reading http response from gateway: %w", err), http.StatusBadGateway) - netconn.Close() + tlsconn.Close() return } if resp.Header.Get("X-Arvados-Authorization-Response") != respondAuth { err = httpserver.ErrorWithStatus(errors.New("bad X-Arvados-Authorization-Response header"), http.StatusBadGateway) - netconn.Close() + tlsconn.Close() return } if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" || strings.ToLower(resp.Header.Get("Connection")) != "upgrade" { err = httpserver.ErrorWithStatus(errors.New("bad upgrade"), http.StatusBadGateway) - netconn.Close() + tlsconn.Close() return } @@ -162,13 +182,60 @@ func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOpt }, }) if err != nil { - netconn.Close() + tlsconn.Close() return } } - sshconn.Conn = netconn + sshconn.Conn = tlsconn sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw} sshconn.Logger = ctxlog.FromContext(ctx) + sshconn.UpgradeHeader = "ssh" + return +} + +// ContainerGatewayTunnel sets up a tunnel enabling us (controller) to +// connect to the caller's (crunch-run's) gateway server. +func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, opts arvados.ContainerGatewayTunnelOptions) (resp arvados.ConnectionResponse, err error) { + h := hmac.New(sha256.New, []byte(conn.cluster.SystemRootToken)) + fmt.Fprint(h, opts.UUID) + authSecret := fmt.Sprintf("%x", h.Sum(nil)) + if subtle.ConstantTimeCompare([]byte(authSecret), []byte(opts.AuthSecret)) != 1 { + ctxlog.FromContext(ctx).Info("received incorrect auth_secret") + return resp, httpserver.ErrorWithStatus(errors.New("authentication error"), http.StatusUnauthorized) + } + + muxconn, clientconn := net.Pipe() + tunnel, err := yamux.Server(muxconn, nil) + if err != nil { + clientconn.Close() + return resp, httpserver.ErrorWithStatus(err, http.StatusInternalServerError) + } + + conn.gwTunnelsLock.Lock() + if conn.gwTunnels == nil { + conn.gwTunnels = map[string]*yamux.Session{opts.UUID: tunnel} + } else { + conn.gwTunnels[opts.UUID] = tunnel + } + conn.gwTunnelsLock.Unlock() + + go func() { + <-tunnel.CloseChan() + conn.gwTunnelsLock.Lock() + if conn.gwTunnels[opts.UUID] == tunnel { + delete(conn.gwTunnels, opts.UUID) + } + conn.gwTunnelsLock.Unlock() + }() + + // Assuming we're acting as the backend of an http server, + // lib/controller/router will call resp's ServeHTTP handler, + // which upgrades the incoming http connection to a raw socket + // and connects it to our yamux.Server through our net.Pipe(). + resp.Conn = clientconn + resp.Bufrw = &bufio.ReadWriter{Reader: bufio.NewReader(&bytes.Buffer{}), Writer: bufio.NewWriter(&bytes.Buffer{})} + resp.Logger = ctxlog.FromContext(ctx) + resp.UpgradeHeader = "tunnel" return } diff --git a/lib/controller/localdb/container_gateway_test.go b/lib/controller/localdb/container_gateway_test.go index 2717604201..b3b604e534 100644 --- a/lib/controller/localdb/container_gateway_test.go +++ b/lib/controller/localdb/container_gateway_test.go @@ -12,9 +12,11 @@ import ( "io" "io/ioutil" "net" + "net/http/httptest" "time" "git.arvados.org/arvados.git/lib/config" + "git.arvados.org/arvados.git/lib/controller/router" "git.arvados.org/arvados.git/lib/crunchrun" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadostest" @@ -55,12 +57,21 @@ func (s *ContainerGatewaySuite) SetUpSuite(c *check.C) { fmt.Fprint(h, s.ctrUUID) authKey := fmt.Sprintf("%x", h.Sum(nil)) + rtr := router.New(s.localdb, router.Config{}) + srv := httptest.NewUnstartedServer(rtr) + srv.StartTLS() + ac := &arvados.Client{ + APIHost: srv.Listener.Addr().String(), + AuthToken: arvadostest.Dispatch1Token, + Insecure: true, + } s.gw = &crunchrun.Gateway{ ContainerUUID: s.ctrUUID, AuthSecret: authKey, Address: "localhost:0", Log: ctxlog.TestLogger(c), Target: crunchrun.GatewayTargetStub{}, + ArvadosClient: ac, } c.Assert(s.gw.Start(), check.IsNil) rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}}) @@ -69,18 +80,25 @@ func (s *ContainerGatewaySuite) SetUpSuite(c *check.C) { Attrs: map[string]interface{}{ "state": arvados.ContainerStateLocked}}) c.Assert(err, check.IsNil) - _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{ +} + +func (s *ContainerGatewaySuite) SetUpTest(c *check.C) { + // clear any tunnel sessions started by previous test cases + s.localdb.gwTunnelsLock.Lock() + s.localdb.gwTunnels = nil + s.localdb.gwTunnelsLock.Unlock() + + rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}}) + _, err := s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{ UUID: s.ctrUUID, Attrs: map[string]interface{}{ "state": arvados.ContainerStateRunning, "gateway_address": s.gw.Address}}) c.Assert(err, check.IsNil) -} -func (s *ContainerGatewaySuite) SetUpTest(c *check.C) { s.cluster.Containers.ShellAccess.Admin = true s.cluster.Containers.ShellAccess.User = true - _, err := arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID) + _, err = arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID) c.Check(err, check.IsNil) } @@ -234,3 +252,99 @@ func (s *ContainerGatewaySuite) TestConnectFail(c *check.C) { _, err = s.localdb.ContainerSSH(ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID}) c.Check(err, check.ErrorMatches, `.* 404 .*`) } + +func (s *ContainerGatewaySuite) TestCreateTunnel(c *check.C) { + // no AuthSecret + conn, err := s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{ + UUID: s.ctrUUID, + }) + c.Check(err, check.ErrorMatches, `authentication error`) + c.Check(conn.Conn, check.IsNil) + + // bogus AuthSecret + conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{ + UUID: s.ctrUUID, + AuthSecret: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + }) + c.Check(err, check.ErrorMatches, `authentication error`) + c.Check(conn.Conn, check.IsNil) + + // good AuthSecret + conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{ + UUID: s.ctrUUID, + AuthSecret: s.gw.AuthSecret, + }) + c.Check(err, check.IsNil) + c.Check(conn.Conn, check.NotNil) +} + +func (s *ContainerGatewaySuite) TestConnectThroughTunnel(c *check.C) { + tungw := &crunchrun.Gateway{ + ContainerUUID: s.ctrUUID, + AuthSecret: s.gw.AuthSecret, + Log: ctxlog.TestLogger(c), + Target: crunchrun.GatewayTargetStub{}, + ArvadosClient: s.gw.ArvadosClient, + } + c.Assert(tungw.Start(), check.IsNil) + + // We didn't supply an external hostname in the Address field, + // so Start() should assign a local address. + host, _, err := net.SplitHostPort(tungw.Address) + c.Assert(err, check.IsNil) + c.Check(host, check.Equals, "127.0.0.1") + + // Set the gateway_address field to 127.0.0.1:badport to + // ensure the ContainerSSH() handler connects through the + // tunnel, rather than the gateway server on 127.0.0.1 (which + // wouldn't work IRL where controller and gateway are on + // different hosts, but would allow the test to cheat). + rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}}) + _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{ + UUID: s.ctrUUID, + Attrs: map[string]interface{}{ + "state": arvados.ContainerStateRunning, + "gateway_address": "127.0.0.1:0"}}) + c.Assert(err, check.IsNil) + + ctr, err := s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID}) + c.Check(err, check.IsNil) + c.Check(ctr.InteractiveSessionStarted, check.Equals, false) + c.Check(ctr.GatewayAddress, check.Equals, "127.0.0.1:0") + + c.Log("connecting to gateway through tunnel") + sshconn, err := s.localdb.ContainerSSH(s.ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID}) + c.Assert(err, check.IsNil) + c.Assert(sshconn.Conn, check.NotNil) + defer sshconn.Conn.Close() + + done := make(chan struct{}) + go func() { + defer close(done) + + // Receive text banner + buf := make([]byte, 12) + _, err := io.ReadFull(sshconn.Conn, buf) + c.Check(err, check.IsNil) + c.Check(string(buf), check.Equals, "SSH-2.0-Go\r\n") + + // Send text banner + _, err = sshconn.Conn.Write([]byte("SSH-2.0-Fake\r\n")) + c.Check(err, check.IsNil) + + // Receive binary + _, err = io.ReadFull(sshconn.Conn, buf[:4]) + c.Check(err, check.IsNil) + + // If we can get this far into an SSH handshake... + c.Logf("was able to read %x -- success, tunnel is working", buf[:4]) + }() + select { + case <-done: + case <-time.After(time.Second): + c.Fail() + } + ctr, err = s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID}) + c.Check(err, check.IsNil) + c.Check(ctr.InteractiveSessionStarted, check.Equals, true) +} diff --git a/lib/controller/router/router.go b/lib/controller/router/router.go index 586ea8e676..a87dbca926 100644 --- a/lib/controller/router/router.go +++ b/lib/controller/router/router.go @@ -244,6 +244,13 @@ func (rtr *router) addRoutes() { return rtr.backend.ContainerSSH(ctx, *opts.(*arvados.ContainerSSHOptions)) }, }, + { + arvados.EndpointContainerGatewayTunnel, + func() interface{} { return &arvados.ContainerGatewayTunnelOptions{} }, + func(ctx context.Context, opts interface{}) (interface{}, error) { + return rtr.backend.ContainerGatewayTunnel(ctx, *opts.(*arvados.ContainerGatewayTunnelOptions)) + }, + }, { arvados.EndpointGroupCreate, func() interface{} { return &arvados.CreateOptions{} }, diff --git a/lib/controller/rpc/conn.go b/lib/controller/rpc/conn.go index 1148068d70..8e25ca0d05 100644 --- a/lib/controller/rpc/conn.go +++ b/lib/controller/rpc/conn.go @@ -332,6 +332,39 @@ func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOption // a running container. If the returned error is nil, the caller is // responsible for closing sshconn.Conn. func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) { + u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1)) + if err != nil { + err = fmt.Errorf("url.Parse: %w", err) + return + } + u.RawQuery = url.Values{ + "detach_keys": {options.DetachKeys}, + "login_username": {options.LoginUsername}, + }.Encode() + resp, err := conn.socket(ctx, u, "ssh", nil) + if err != nil { + return + } + return arvados.ContainerSSHConnection(resp), nil +} + +// ContainerGatewayTunnel returns a connection to a yamux session on +// the controller. The caller should connect the returned resp.Conn to +// a client-side yamux session. +func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (tunnelconn arvados.ConnectionResponse, err error) { + u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", options.UUID, -1)) + if err != nil { + err = fmt.Errorf("url.Parse: %w", err) + return + } + return conn.socket(ctx, u, "tunnel", url.Values{ + "auth_secret": {options.AuthSecret}, + }) +} + +// socket sets up a socket using the specified API endpoint and +// upgrade header. +func (conn *Conn) socket(ctx context.Context, u *url.URL, upgradeHeader string, postform url.Values) (connresp arvados.ConnectionResponse, err error) { addr := conn.baseURL.Host if strings.Index(addr, ":") < 1 || (strings.Contains(addr, "::") && addr[0] != '[') { // hostname or ::1 or 1::1 @@ -354,15 +387,6 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH bufr := bufio.NewReader(netconn) bufw := bufio.NewWriter(netconn) - u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1)) - if err != nil { - err = fmt.Errorf("tls.Dial: %w", err) - return - } - u.RawQuery = url.Values{ - "detach_keys": {options.DetachKeys}, - "login_username": {options.LoginUsername}, - }.Encode() tokens, err := conn.tokenProvider(ctx) if err != nil { return @@ -370,11 +394,17 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH err = httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized) return } - bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n") + postdata := postform.Encode() + bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n") bufw.WriteString("Authorization: Bearer " + tokens[0] + "\r\n") bufw.WriteString("Host: " + u.Host + "\r\n") - bufw.WriteString("Upgrade: ssh\r\n") + bufw.WriteString("Upgrade: " + upgradeHeader + "\r\n") + bufw.WriteString("Content-Type: application/x-www-form-urlencoded\r\n") + fmt.Fprintf(bufw, "Content-Length: %d\r\n", len(postdata)) bufw.WriteString("\r\n") + if len(postdata) > 0 { + bufw.WriteString(postdata) + } bufw.Flush() resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"}) if err != nil { @@ -394,13 +424,13 @@ func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSH err = fmt.Errorf("server did not provide a tunnel: %s (HTTP %d)", message, resp.StatusCode) return } - if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" || + if strings.ToLower(resp.Header.Get("Upgrade")) != upgradeHeader || strings.ToLower(resp.Header.Get("Connection")) != "upgrade" { err = fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection")) return } - sshconn.Conn = netconn - sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw} + connresp.Conn = netconn + connresp.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw} return } diff --git a/lib/crunchrun/container_gateway.go b/lib/crunchrun/container_gateway.go index 01457015e1..49eb68c0f5 100644 --- a/lib/crunchrun/container_gateway.go +++ b/lib/crunchrun/container_gateway.go @@ -14,16 +14,22 @@ import ( "io" "net" "net/http" + "net/url" "os" "os/exec" "sync" "syscall" + "time" + "git.arvados.org/arvados.git/lib/controller/rpc" "git.arvados.org/arvados.git/lib/selfsigned" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/auth" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/httpserver" "github.com/creack/pty" "github.com/google/shlex" + "github.com/hashicorp/yamux" "golang.org/x/crypto/ssh" "golang.org/x/net/context" ) @@ -45,12 +51,27 @@ func (GatewayTargetStub) InjectCommand(ctx context.Context, detachKeys, username type Gateway struct { ContainerUUID string - Address string // listen host:port; if port=0, Start() will change it to the selected port - AuthSecret string - Target GatewayTarget - Log interface { + // Caller should set Address to "", or "host:0" or "host:port" + // where host is a known external IP address; port is a + // desired port number to listen on; and ":0" chooses an + // available dynamic port. + // + // If Address is "", Start() listens only on the loopback + // interface (and changes Address to "127.0.0.1:port"). + // Otherwise it listens on all interfaces. + // + // If Address is "host:0", Start() updates Address to + // "host:port". + Address string + AuthSecret string + Target GatewayTarget + Log interface { Printf(fmt string, args ...interface{}) } + // If non-nil, set up a ContainerGatewayTunnel, so that the + // controller can connect to us even if our external IP + // address is unknown or not routable from controller. + ArvadosClient *arvados.Client sshConfig ssh.ServerConfig requestAuth string @@ -99,7 +120,22 @@ func (gw *Gateway) Start() error { // from arvados-controller, and PORT is either the desired // port where we should run our gateway server, or "0" if we // should choose an available port. - host, port, err := net.SplitHostPort(gw.Address) + extAddr := gw.Address + // Generally we can't know which local interface corresponds + // to an externally reachable IP address, so if we expect to + // be reachable by external hosts, we listen on all + // interfaces. + listenHost := "" + if extAddr == "" { + // If the dispatcher doesn't tell us our external IP + // address, controller will only be able to connect + // through the tunnel (see runTunnel), so our gateway + // server only needs to listen on the loopback + // interface. + extAddr = "127.0.0.1:0" + listenHost = "127.0.0.1" + } + extHost, extPort, err := net.SplitHostPort(extAddr) if err != nil { return err } @@ -121,26 +157,87 @@ func (gw *Gateway) Start() error { Certificates: []tls.Certificate{cert}, }, }, - Addr: ":" + port, + Addr: net.JoinHostPort(listenHost, extPort), } err = srv.Start() if err != nil { return err } - // Get the port number we are listening on (the port might be + // Get the port number we are listening on (extPort might be // "0" or a port name, in which case this will be different). - _, port, err = net.SplitHostPort(srv.Addr) + _, listenPort, err := net.SplitHostPort(srv.Addr) if err != nil { return err } - // When changing state to Running, we will set - // gateway_address to "HOST:PORT" where HOST is our - // external hostname/IP as provided by arvados-dispatch-cloud, - // and PORT is the port number we ended up listening on. - gw.Address = net.JoinHostPort(host, port) + // When changing state to Running, the caller will want to set + // gateway_address to a "HOST:PORT" that, if controller + // connects to it, will reach this gateway server. + // + // The most likely thing to work is: HOST is our external + // hostname/IP as provided by the caller + // (arvados-dispatch-cloud) or 127.0.0.1 to indicate + // non-tunnel connections aren't available; and PORT is the + // port number we are listening on. + gw.Address = net.JoinHostPort(extHost, listenPort) + if gw.ArvadosClient != nil { + go gw.maintainTunnel(gw.Address) + } return nil } +func (gw *Gateway) maintainTunnel(addr string) { + for ; ; time.Sleep(5 * time.Second) { + err := gw.runTunnel(addr) + gw.Log.Printf("runTunnel: %s", err) + } +} + +// runTunnel connects to controller and sets up a tunnel through +// which controller can connect to the gateway server at the given +// addr. +func (gw *Gateway) runTunnel(addr string) error { + ctx := auth.NewContext(context.Background(), auth.NewCredentials(gw.ArvadosClient.AuthToken)) + arpc := rpc.NewConn("", &url.URL{Scheme: "https", Host: gw.ArvadosClient.APIHost}, gw.ArvadosClient.Insecure, rpc.PassthroughTokenProvider) + tun, err := arpc.ContainerGatewayTunnel(ctx, arvados.ContainerGatewayTunnelOptions{ + UUID: gw.ContainerUUID, + AuthSecret: gw.AuthSecret, + }) + if err != nil { + return fmt.Errorf("error creating gateway tunnel: %s", err) + } + mux, err := yamux.Client(tun.Conn, nil) + if err != nil { + return fmt.Errorf("error setting up mux client end: %s", err) + } + for { + muxconn, err := mux.Accept() + if err != nil { + return err + } + gw.Log.Printf("receiving connection from tunnel, remoteAddr %s", muxconn.RemoteAddr().String()) + go func() { + defer muxconn.Close() + gwconn, err := net.Dial("tcp", addr) + if err != nil { + gw.Log.Printf("error connecting to %s on behalf of tunnel connection: %s", addr, err) + return + } + defer gwconn.Close() + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + io.Copy(gwconn, muxconn) + }() + go func() { + defer wg.Done() + io.Copy(muxconn, gwconn) + }() + wg.Wait() + }() + } +} + // handleSSH connects to an SSH server that allows the caller to run // interactive commands as root (or any other desired user) inside the // container. The tunnel itself can only be created by an @@ -166,7 +263,7 @@ func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) { // In future we'll handle browser traffic too, but for now the // only traffic we expect is an SSH tunnel from // (*lib/controller/localdb.Conn)ContainerSSH() - if req.Method != "GET" || req.Header.Get("Upgrade") != "ssh" { + if req.Method != "POST" || req.Header.Get("Upgrade") != "ssh" { http.Error(w, "path not found", http.StatusNotFound) return } @@ -204,7 +301,9 @@ func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) { ctx := req.Context() conn, newchans, reqs, err := ssh.NewServerConn(netconn, &gw.sshConfig) - if err != nil { + if err == io.EOF { + return + } else if err != nil { gw.Log.Printf("ssh.NewServerConn: %s", err) return } diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 0e86f604a7..c2ed37e75a 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -1916,6 +1916,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s ContainerUUID: containerUUID, Target: cr.executor, Log: cr.CrunchLog, + ArvadosClient: cr.dispatcherClient, } err = cr.gateway.Start() if err != nil { diff --git a/sdk/go/arvados/api.go b/sdk/go/arvados/api.go index d76ece1edd..1b303ffb41 100644 --- a/sdk/go/arvados/api.go +++ b/sdk/go/arvados/api.go @@ -47,7 +47,8 @@ var ( EndpointContainerDelete = APIEndpoint{"DELETE", "arvados/v1/containers/{uuid}", ""} EndpointContainerLock = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/lock", ""} EndpointContainerUnlock = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/unlock", ""} - EndpointContainerSSH = APIEndpoint{"GET", "arvados/v1/connect/{uuid}/ssh", ""} // move to /containers after #17014 fixes routing + EndpointContainerSSH = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/ssh", ""} // move to /containers after #17014 fixes routing + EndpointContainerGatewayTunnel = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/gateway_tunnel", ""} // move to /containers after #17014 fixes routing EndpointContainerRequestCreate = APIEndpoint{"POST", "arvados/v1/container_requests", "container_request"} EndpointContainerRequestUpdate = APIEndpoint{"PATCH", "arvados/v1/container_requests/{uuid}", "container_request"} EndpointContainerRequestGet = APIEndpoint{"GET", "arvados/v1/container_requests/{uuid}", ""} @@ -98,10 +99,18 @@ type ContainerSSHOptions struct { LoginUsername string `json:"login_username"` } -type ContainerSSHConnection struct { - Conn net.Conn `json:"-"` - Bufrw *bufio.ReadWriter `json:"-"` - Logger logrus.FieldLogger `json:"-"` +type ContainerSSHConnection ConnectionResponse + +type ConnectionResponse struct { + Conn net.Conn `json:"-"` + Bufrw *bufio.ReadWriter `json:"-"` + Logger logrus.FieldLogger `json:"-"` + UpgradeHeader string `json:"-"` +} + +type ContainerGatewayTunnelOptions struct { + UUID string `json:"uuid"` + AuthSecret string `json:"auth_secret"` } type GetOptions struct { @@ -255,6 +264,7 @@ type API interface { ContainerLock(ctx context.Context, options GetOptions) (Container, error) ContainerUnlock(ctx context.Context, options GetOptions) (Container, error) ContainerSSH(ctx context.Context, options ContainerSSHOptions) (ContainerSSHConnection, error) + ContainerGatewayTunnel(ctx context.Context, options ContainerGatewayTunnelOptions) (ConnectionResponse, error) ContainerRequestCreate(ctx context.Context, options CreateOptions) (ContainerRequest, error) ContainerRequestUpdate(ctx context.Context, options UpdateOptions) (ContainerRequest, error) ContainerRequestGet(ctx context.Context, options GetOptions) (ContainerRequest, error) diff --git a/sdk/go/arvados/container_gateway.go b/sdk/go/arvados/container_gateway.go index 00c98d572e..d1d512856a 100644 --- a/sdk/go/arvados/container_gateway.go +++ b/sdk/go/arvados/container_gateway.go @@ -14,14 +14,14 @@ import ( "github.com/sirupsen/logrus" ) -func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http.Request) { +func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) { hj, ok := w.(http.Hijacker) if !ok { http.Error(w, "ResponseWriter does not support connection upgrade", http.StatusInternalServerError) return } w.Header().Set("Connection", "upgrade") - w.Header().Set("Upgrade", "ssh") + w.Header().Set("Upgrade", cresp.UpgradeHeader) w.WriteHeader(http.StatusSwitchingProtocols) conn, bufrw, err := hj.Hijack() if err != nil { @@ -37,10 +37,10 @@ func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http go func() { defer wg.Done() defer cancel() - n, err := io.CopyN(conn, sshconn.Bufrw, int64(sshconn.Bufrw.Reader.Buffered())) + n, err := io.CopyN(conn, cresp.Bufrw, int64(cresp.Bufrw.Reader.Buffered())) bytesOut += n if err == nil { - n, err = io.Copy(conn, sshconn.Conn) + n, err = io.Copy(conn, cresp.Conn) bytesOut += n } if err != nil { @@ -51,10 +51,10 @@ func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http go func() { defer wg.Done() defer cancel() - n, err := io.CopyN(sshconn.Conn, bufrw, int64(bufrw.Reader.Buffered())) + n, err := io.CopyN(cresp.Conn, bufrw, int64(bufrw.Reader.Buffered())) bytesIn += n if err == nil { - n, err = io.Copy(sshconn.Conn, conn) + n, err = io.Copy(cresp.Conn, conn) bytesIn += n } if err != nil { @@ -62,10 +62,10 @@ func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http } }() <-ctx.Done() - if sshconn.Logger != nil { + if cresp.Logger != nil { go func() { wg.Wait() - sshconn.Logger.WithFields(logrus.Fields{ + cresp.Logger.WithFields(logrus.Fields{ "bytesIn": bytesIn, "bytesOut": bytesOut, }).Info("closed connection") diff --git a/sdk/go/arvadostest/api.go b/sdk/go/arvadostest/api.go index f49d29ce2b..d784abf671 100644 --- a/sdk/go/arvadostest/api.go +++ b/sdk/go/arvadostest/api.go @@ -113,6 +113,10 @@ func (as *APIStub) ContainerSSH(ctx context.Context, options arvados.ContainerSS as.appendCall(ctx, as.ContainerSSH, options) return arvados.ContainerSSHConnection{}, as.Error } +func (as *APIStub) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) { + as.appendCall(ctx, as.ContainerGatewayTunnel, options) + return arvados.ConnectionResponse{}, as.Error +} func (as *APIStub) ContainerRequestCreate(ctx context.Context, options arvados.CreateOptions) (arvados.ContainerRequest, error) { as.appendCall(ctx, as.ContainerRequestCreate, options) return arvados.ContainerRequest{}, as.Error diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb index 08f87bbdb1..43af0721c4 100644 --- a/services/api/app/models/container.rb +++ b/services/api/app/models/container.rb @@ -498,9 +498,9 @@ class Container < ArvadosModel permitted.push :priority when Running - permitted.push :priority, :output_properties, *progress_attrs + permitted.push :priority, :output_properties, :gateway_address, *progress_attrs if self.state_changed? - permitted.push :started_at, :gateway_address + permitted.push :started_at end if !self.interactive_session_started_was permitted.push :interactive_session_started diff --git a/services/api/test/unit/container_test.rb b/services/api/test/unit/container_test.rb index bcf99da2e3..a4c0ce1792 100644 --- a/services/api/test/unit/container_test.rb +++ b/services/api/test/unit/container_test.rb @@ -958,6 +958,7 @@ class ContainerTest < ActiveSupport::TestCase Thread.current[:user] = auth.user end + assert c.update_attributes(gateway_address: "127.0.0.1:9") assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash) assert c.update_attributes(runtime_status: {'warning' => 'something happened'}) assert c.update_attributes(progress: 0.5)