fmt.Fprintf(stderr, "target UUID is not a container or container request UUID: %s\n", targetUUID)
return 1
}
- sshconn, err := rpcconn.ContainerSSH(context.TODO(), arvados.ContainerSSHOptions{
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{
UUID: targetUUID,
DetachKeys: *detachKeys,
LoginUsername: loginUsername,
return 0
}
- ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
_, err := io.Copy(stdout, sshconn.Conn)
"git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
check "gopkg.in/check.v1"
)
ContainerUUID: uuid,
Address: "0.0.0.0:0",
AuthSecret: authSecret,
+ Log: ctxlog.TestLogger(c),
// Just forward connections to localhost instead of a
// container, so we can test without running a
// container.
cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
cmd.Stdout = &stdout
cmd.Stderr = &stderr
+ stdin, err := cmd.StdinPipe()
+ c.Assert(err, check.IsNil)
+ go fmt.Fprintln(stdin, "data appears on stdin, but stdin does not close; cmd should exit anyway, not hang")
+ time.AfterFunc(5*time.Second, func() {
+ c.Errorf("timed out -- remote end is probably hung waiting for us to close stdin")
+ stdin.Close()
+ })
c.Check(cmd.Run(), check.IsNil)
c.Check(stdout.String(), check.Equals, "ok\n")
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/health"
+ dispatchslurm "git.arvados.org/arvados.git/services/crunch-dispatch-slurm"
"git.arvados.org/arvados.git/services/githttpd"
keepbalance "git.arvados.org/arvados.git/services/keep-balance"
keepweb "git.arvados.org/arvados.git/services/keep-web"
"crunch-run": crunchrun.Command,
"dispatch-cloud": dispatchcloud.Command,
"dispatch-lsf": lsf.DispatchCommand,
+ "dispatch-slurm": dispatchslurm.Command,
"git-httpd": githttpd.Command,
"health": healthCommand,
"install": install.Command,
- Computation with Crunch:
- api/execution.html.textile.liquid
- architecture/dispatchcloud.html.textile.liquid
+ - architecture/hpc.html.textile.liquid
- architecture/singularity.html.textile.liquid
- Other:
- api/permission-model.html.textile.liquid
table(table table-bordered table-condensed).
|_.Service |_.ExternalURL required? |_.InternalURLs required?|_.InternalURLs must be reachable from other cluster nodes?|_.Note|
|railsapi |no |yes|no ^1^|InternalURLs only used by Controller|
-|controller |yes |yes|no ^2^|InternalURLs only used by reverse proxy (e.g. Nginx)|
+|controller |yes |yes|yes ^2,4^|InternalURLs used by reverse proxy and container shell connections|
|arvados-dispatch-cloud|no |yes|no ^3^|InternalURLs only used to expose Prometheus metrics|
|arvados-dispatch-lsf|no |yes|no ^3^|InternalURLs only used to expose Prometheus metrics|
|git-http |yes |yes|no ^2^|InternalURLs only used by reverse proxy (e.g. Nginx)|
^1^ If @Controller@ runs on a different host than @RailsAPI@, the @InternalURLs@ will need to be reachable from the host that runs @Controller@.
^2^ If the reverse proxy (e.g. Nginx) does not run on the same host as the Arvados service it fronts, the @InternalURLs@ will need to be reachable from the host that runs the reverse proxy.
^3^ If the Prometheus metrics are not collected from the same machine that runs the service, the @InternalURLs@ will need to be reachable from the host that collects the metrics.
+^4^ If dispatching containers to HPC (Slurm/LSF) and there are multiple @Controller@ services, they must be able to connect to one another using their InternalURLs, otherwise the "tunnel connections":{{site.baseurl}}/architecture/hpc.html enabling "container shell access":{{site.baseurl}}/install/container-shell-access.html will not work.
When @InternalURLs@ do not need to be reachable from other nodes, it is most secure to use loopback addresses as @InternalURLs@, e.g. @http://127.0.0.1:9005@.
--- /dev/null
+---
+layout: default
+navsection: architecture
+title: Dispatching containers to HPC
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Arvados can be configured to run containers on an HPC cluster using Slurm or LSF, as an alternative to "dispatching to cloud VMs":dispatchcloud.html.
+
+In this configuration, the appropriate Arvados dispatcher service -- @crunch-dispatch-slurm@ or @arvados-dispatch-lsf@ -- picks up each container as it appears in the Arvados queue and submits a short shell script as a batch job to the HPC job queue. The shell script executes the @crunch-run@ container supervisor which retrieves the container specification from the Arvados controller, starts an arv-mount process, runs the container using @docker exec@ or @singularity exec@, and sends updates (logs, outputs, exit code, etc.) back to the Arvados controller.
+
+h2. Container communication channel (reverse https tunnel)
+
+The crunch-run program runs a gateway server to facilitate the “container shell” feature. However, depending on the site's network topology, the Arvados controller may not be able to connect directly to the compute node where a given crunch-run process is running.
+
+Instead, in the HPC configuration, crunch-run connects to the Arvados controller at startup and sets up a multiplexed tunnel, allowing the controller process to connect to crunch-run's gateway server without initiating a connection to the compute node, or even knowing the compute node's IP address.
+
+This means that when a client requests a container shell connection, the traffic goes through two or three servers:
+# The client connects to a controller host C1.
+# If the multiplexed tunnel is connected to a different controller host C2, then C1 proxies the incoming request to C2, using C2's InternalURL.
+# The controller host (C1 or C2) uses the multiplexed tunnel to connect to crunch-run's container gateway.
+
+h2. Scaling
+
+The @API.MaxConcurrentRequests@ configuration should not be set too low, or the long-lived tunnel connections can starve other clients.
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
+ github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 h1:xixZ2bWeofWV68J+x6AzmKuVM/JWCQwkWm6GW/MUR6I=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
return conn.chooseBackend(options.UUID).ContainerUnlock(ctx, options)
}
-func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ContainerSSHConnection, error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ConnectionResponse, error) {
return conn.chooseBackend(options.UUID).ContainerSSH(ctx, options)
}
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) {
+ return conn.chooseBackend(options.UUID).ContainerGatewayTunnel(ctx, options)
+}
+
func (conn *Conn) ContainerRequestList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerRequestList, error) {
return conn.generated_ContainerRequestList(ctx, options)
}
"net/http"
"os"
"strings"
+ "sync"
"time"
"git.arvados.org/arvados.git/lib/controller/railsproxy"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
+ "github.com/hashicorp/yamux"
"github.com/sirupsen/logrus"
)
lastVocabularyRefreshCheck time.Time
lastVocabularyError error
loginController
+ gwTunnels map[string]*yamux.Session
+ gwTunnelsLock sync.Mutex
}
func NewConn(cluster *arvados.Cluster) *Conn {
import (
"bufio"
+ "bytes"
"context"
"crypto/hmac"
"crypto/sha256"
+ "crypto/subtle"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
+ "io"
+ "io/ioutil"
+ "net"
"net/http"
"net/url"
"strings"
+ "git.arvados.org/arvados.git/lib/controller/rpc"
+ "git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/auth"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
+ "github.com/hashicorp/yamux"
+)
+
+var (
+ forceProxyForTest = false
+ forceInternalURLForTest *arvados.URL
)
// ContainerSSH returns a connection to the SSH server in the
//
// If the returned error is nil, the caller is responsible for closing
// sshconn.Conn.
-func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOptions) (sshconn arvados.ConnectionResponse, err error) {
user, err := conn.railsProxy.UserGetCurrent(ctx, arvados.GetOptions{})
if err != nil {
- return
+ return sshconn, err
}
ctr, err := conn.railsProxy.ContainerGet(ctx, arvados.GetOptions{UUID: opts.UUID})
if err != nil {
- return
+ return sshconn, err
}
ctxRoot := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{conn.cluster.SystemRootToken}})
if !user.IsAdmin || !conn.cluster.Containers.ShellAccess.Admin {
if !conn.cluster.Containers.ShellAccess.User {
- err = httpserver.ErrorWithStatus(errors.New("shell access is disabled in config"), http.StatusServiceUnavailable)
- return
+ return sshconn, httpserver.ErrorWithStatus(errors.New("shell access is disabled in config"), http.StatusServiceUnavailable)
}
- var crs arvados.ContainerRequestList
- crs, err = conn.railsProxy.ContainerRequestList(ctxRoot, arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"container_uuid", "=", opts.UUID}}})
+ crs, err := conn.railsProxy.ContainerRequestList(ctxRoot, arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"container_uuid", "=", opts.UUID}}})
if err != nil {
- return
+ return sshconn, err
}
for _, cr := range crs.Items {
if cr.ModifiedByUserUUID != user.UUID {
- err = httpserver.ErrorWithStatus(errors.New("permission denied: container is associated with requests submitted by other users"), http.StatusForbidden)
- return
+ return sshconn, httpserver.ErrorWithStatus(errors.New("permission denied: container is associated with requests submitted by other users"), http.StatusForbidden)
}
}
if crs.ItemsAvailable != len(crs.Items) {
- err = httpserver.ErrorWithStatus(errors.New("incomplete response while checking permission"), http.StatusInternalServerError)
- return
+ return sshconn, httpserver.ErrorWithStatus(errors.New("incomplete response while checking permission"), http.StatusInternalServerError)
}
}
- switch ctr.State {
- case arvados.ContainerStateQueued, arvados.ContainerStateLocked:
- err = httpserver.ErrorWithStatus(fmt.Errorf("container is not running yet (state is %q)", ctr.State), http.StatusServiceUnavailable)
- return
- case arvados.ContainerStateRunning:
- if ctr.GatewayAddress == "" {
- err = httpserver.ErrorWithStatus(errors.New("container is running but gateway is not available -- installation problem or feature not supported"), http.StatusServiceUnavailable)
- return
+ conn.gwTunnelsLock.Lock()
+ tunnel := conn.gwTunnels[opts.UUID]
+ conn.gwTunnelsLock.Unlock()
+
+ if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked {
+ return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("container is not running yet (state is %q)", ctr.State), http.StatusServiceUnavailable)
+ } else if ctr.State != arvados.ContainerStateRunning {
+ return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("container has ended (state is %q)", ctr.State), http.StatusGone)
+ }
+
+ // targetHost is the value we'll use in the Host header in our
+ // "Upgrade: ssh" http request. It's just a placeholder
+ // "localhost", unless we decide to connect directly, in which
+ // case we'll set it to the gateway's external ip:host. (The
+ // gateway doesn't even look at it, but we might as well.)
+ targetHost := "localhost"
+ myURL, _ := service.URLFromContext(ctx)
+
+ var rawconn net.Conn
+ if host, _, splitErr := net.SplitHostPort(ctr.GatewayAddress); splitErr == nil && host != "" && host != "127.0.0.1" {
+ // If crunch-run provided a GatewayAddress like
+ // "ipaddr:port", that means "ipaddr" is one of the
+ // external interfaces where the gateway is
+ // listening. In that case, it's the most
+ // reliable/direct option, so we use it even if a
+ // tunnel might also be available.
+ targetHost = ctr.GatewayAddress
+ rawconn, err = net.Dial("tcp", ctr.GatewayAddress)
+ if err != nil {
+ return sshconn, httpserver.ErrorWithStatus(err, http.StatusServiceUnavailable)
}
- default:
- err = httpserver.ErrorWithStatus(fmt.Errorf("container has ended (state is %q)", ctr.State), http.StatusGone)
- return
+ } else if tunnel != nil && !(forceProxyForTest && !opts.NoForward) {
+ // If we can't connect directly, and the gateway has
+ // established a yamux tunnel with us, connect through
+ // the tunnel.
+ //
+ // ...except: forceProxyForTest means we are emulating
+ // a situation where the gateway has established a
+ // yamux tunnel with controller B, and the
+ // ContainerSSH request arrives at controller A. If
+ // opts.NoForward==false then we are acting as A, so
+ // we pretend not to have a tunnel, and fall through
+ // to the "tunurl" case below. If opts.NoForward==true
+ // then the client is A and we are acting as B, so we
+ // connect to our tunnel.
+ rawconn, err = tunnel.Open()
+ if err != nil {
+ return sshconn, httpserver.ErrorWithStatus(err, http.StatusServiceUnavailable)
+ }
+ } else if ctr.GatewayAddress == "" {
+ return sshconn, httpserver.ErrorWithStatus(errors.New("container is running but gateway is not available"), http.StatusServiceUnavailable)
+ } else if tunurl := strings.TrimPrefix(ctr.GatewayAddress, "tunnel "); tunurl != ctr.GatewayAddress &&
+ tunurl != "" &&
+ tunurl != myURL.String() &&
+ !opts.NoForward {
+ // If crunch-run provided a GatewayAddress like
+ // "tunnel https://10.0.0.10:1010/", that means the
+ // gateway has established a yamux tunnel with the
+ // controller process at the indicated InternalURL
+ // (which isn't us, otherwise we would have had
+ // "tunnel != nil" above). We need to proxy through to
+ // the other controller process in order to use the
+ // tunnel.
+ for u := range conn.cluster.Services.Controller.InternalURLs {
+ if u.String() == tunurl {
+ ctxlog.FromContext(ctx).Debugf("proxying ContainerSSH request to other controller at %s", u)
+ u := url.URL(u)
+ arpc := rpc.NewConn(conn.cluster.ClusterID, &u, conn.cluster.TLS.Insecure, rpc.PassthroughTokenProvider)
+ opts.NoForward = true
+ return arpc.ContainerSSH(ctx, opts)
+ }
+ }
+ ctxlog.FromContext(ctx).Warnf("container gateway provided a tunnel endpoint %s that is not one of Services.Controller.InternalURLs", tunurl)
+ return sshconn, httpserver.ErrorWithStatus(errors.New("container gateway is running but tunnel endpoint is invalid"), http.StatusServiceUnavailable)
+ } else {
+ return sshconn, httpserver.ErrorWithStatus(errors.New("container gateway is running but tunnel is down"), http.StatusServiceUnavailable)
}
+
// crunch-run uses a self-signed / unverifiable TLS
// certificate, so we use the following scheme to ensure we're
// not talking to a MITM.
// X-Arvados-Authorization-Response header, proving that the
// server knows ctrKey.
var requestAuth, respondAuth string
- netconn, err := tls.Dial("tcp", ctr.GatewayAddress, &tls.Config{
+ tlsconn := tls.Client(rawconn, &tls.Config{
InsecureSkipVerify: true,
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
if len(rawCerts) == 0 {
return nil
},
})
+ err = tlsconn.HandshakeContext(ctx)
if err != nil {
- err = httpserver.ErrorWithStatus(err, http.StatusBadGateway)
- return
+ return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("TLS handshake failed: %w", err), http.StatusBadGateway)
}
if respondAuth == "" {
- err = httpserver.ErrorWithStatus(errors.New("BUG: no respondAuth"), http.StatusInternalServerError)
- return
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(errors.New("BUG: no respondAuth"), http.StatusInternalServerError)
}
- bufr := bufio.NewReader(netconn)
- bufw := bufio.NewWriter(netconn)
+ bufr := bufio.NewReader(tlsconn)
+ bufw := bufio.NewWriter(tlsconn)
u := url.URL{
Scheme: "http",
- Host: ctr.GatewayAddress,
+ Host: targetHost,
Path: "/ssh",
}
- bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
+ postform := url.Values{
+ "uuid": {opts.UUID},
+ "detach_keys": {opts.DetachKeys},
+ "login_username": {opts.LoginUsername},
+ "no_forward": {fmt.Sprintf("%v", opts.NoForward)},
+ }
+ postdata := postform.Encode()
+ bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n")
bufw.WriteString("Host: " + u.Host + "\r\n")
bufw.WriteString("Upgrade: ssh\r\n")
- bufw.WriteString("X-Arvados-Target-Uuid: " + opts.UUID + "\r\n")
bufw.WriteString("X-Arvados-Authorization: " + requestAuth + "\r\n")
- bufw.WriteString("X-Arvados-Detach-Keys: " + opts.DetachKeys + "\r\n")
- bufw.WriteString("X-Arvados-Login-Username: " + opts.LoginUsername + "\r\n")
+ bufw.WriteString("Content-Type: application/x-www-form-urlencoded\r\n")
+ fmt.Fprintf(bufw, "Content-Length: %d\r\n", len(postdata))
bufw.WriteString("\r\n")
+ bufw.WriteString(postdata)
bufw.Flush()
- resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
+ resp, err := http.ReadResponse(bufr, &http.Request{Method: "POST"})
if err != nil {
- err = httpserver.ErrorWithStatus(fmt.Errorf("error reading http response from gateway: %w", err), http.StatusBadGateway)
- netconn.Close()
- return
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("error reading http response from gateway: %w", err), http.StatusBadGateway)
}
- if resp.Header.Get("X-Arvados-Authorization-Response") != respondAuth {
- err = httpserver.ErrorWithStatus(errors.New("bad X-Arvados-Authorization-Response header"), http.StatusBadGateway)
- netconn.Close()
- return
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusSwitchingProtocols {
+ body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 1000))
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("unexpected status %s %q", resp.Status, body), http.StatusBadGateway)
}
if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
- err = httpserver.ErrorWithStatus(errors.New("bad upgrade"), http.StatusBadGateway)
- netconn.Close()
- return
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(errors.New("bad upgrade"), http.StatusBadGateway)
+ }
+ if resp.Header.Get("X-Arvados-Authorization-Response") != respondAuth {
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(errors.New("bad X-Arvados-Authorization-Response header"), http.StatusBadGateway)
}
if !ctr.InteractiveSessionStarted {
},
})
if err != nil {
- netconn.Close()
- return
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(err, http.StatusInternalServerError)
}
}
- sshconn.Conn = netconn
+ sshconn.Conn = tlsconn
sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
sshconn.Logger = ctxlog.FromContext(ctx)
+ sshconn.Header = http.Header{"Upgrade": {"ssh"}}
+ return sshconn, nil
+}
+
+// ContainerGatewayTunnel sets up a tunnel enabling us (controller) to
+// connect to the caller's (crunch-run's) gateway server.
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, opts arvados.ContainerGatewayTunnelOptions) (resp arvados.ConnectionResponse, err error) {
+ h := hmac.New(sha256.New, []byte(conn.cluster.SystemRootToken))
+ fmt.Fprint(h, opts.UUID)
+ authSecret := fmt.Sprintf("%x", h.Sum(nil))
+ if subtle.ConstantTimeCompare([]byte(authSecret), []byte(opts.AuthSecret)) != 1 {
+ ctxlog.FromContext(ctx).Info("received incorrect auth_secret")
+ return resp, httpserver.ErrorWithStatus(errors.New("authentication error"), http.StatusUnauthorized)
+ }
+
+ muxconn, clientconn := net.Pipe()
+ tunnel, err := yamux.Server(muxconn, nil)
+ if err != nil {
+ clientconn.Close()
+ return resp, httpserver.ErrorWithStatus(err, http.StatusInternalServerError)
+ }
+
+ conn.gwTunnelsLock.Lock()
+ if conn.gwTunnels == nil {
+ conn.gwTunnels = map[string]*yamux.Session{opts.UUID: tunnel}
+ } else {
+ conn.gwTunnels[opts.UUID] = tunnel
+ }
+ conn.gwTunnelsLock.Unlock()
+
+ go func() {
+ <-tunnel.CloseChan()
+ conn.gwTunnelsLock.Lock()
+ if conn.gwTunnels[opts.UUID] == tunnel {
+ delete(conn.gwTunnels, opts.UUID)
+ }
+ conn.gwTunnelsLock.Unlock()
+ }()
+
+ // Assuming we're acting as the backend of an http server,
+ // lib/controller/router will call resp's ServeHTTP handler,
+ // which upgrades the incoming http connection to a raw socket
+ // and connects it to our yamux.Server through our net.Pipe().
+ resp.Conn = clientconn
+ resp.Bufrw = &bufio.ReadWriter{Reader: bufio.NewReader(&bytes.Buffer{}), Writer: bufio.NewWriter(&bytes.Buffer{})}
+ resp.Logger = ctxlog.FromContext(ctx)
+ resp.Header = http.Header{"Upgrade": {"tunnel"}}
+ if u, ok := service.URLFromContext(ctx); ok {
+ resp.Header.Set("X-Arvados-Internal-Url", u.String())
+ } else if forceInternalURLForTest != nil {
+ resp.Header.Set("X-Arvados-Internal-Url", forceInternalURLForTest.String())
+ }
return
}
"io"
"io/ioutil"
"net"
+ "net/http/httptest"
+ "net/url"
+ "strings"
"time"
"git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/lib/controller/router"
+ "git.arvados.org/arvados.git/lib/controller/rpc"
"git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
fmt.Fprint(h, s.ctrUUID)
authKey := fmt.Sprintf("%x", h.Sum(nil))
+ rtr := router.New(s.localdb, router.Config{})
+ srv := httptest.NewUnstartedServer(rtr)
+ srv.StartTLS()
+ // the test setup doesn't use lib/service so
+ // service.URLFromContext() returns nothing -- instead, this
+ // is how we advertise our internal URL and enable
+ // proxy-to-other-controller mode,
+ forceInternalURLForTest = &arvados.URL{Scheme: "https", Host: srv.Listener.Addr().String()}
+ ac := &arvados.Client{
+ APIHost: srv.Listener.Addr().String(),
+ AuthToken: arvadostest.Dispatch1Token,
+ Insecure: true,
+ }
s.gw = &crunchrun.Gateway{
ContainerUUID: s.ctrUUID,
AuthSecret: authKey,
Address: "localhost:0",
Log: ctxlog.TestLogger(c),
Target: crunchrun.GatewayTargetStub{},
+ ArvadosClient: ac,
}
c.Assert(s.gw.Start(), check.IsNil)
rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
Attrs: map[string]interface{}{
"state": arvados.ContainerStateLocked}})
c.Assert(err, check.IsNil)
- _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+}
+
+func (s *ContainerGatewaySuite) SetUpTest(c *check.C) {
+ // clear any tunnel sessions started by previous test cases
+ s.localdb.gwTunnelsLock.Lock()
+ s.localdb.gwTunnels = nil
+ s.localdb.gwTunnelsLock.Unlock()
+
+ rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
+ _, err := s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
UUID: s.ctrUUID,
Attrs: map[string]interface{}{
"state": arvados.ContainerStateRunning,
"gateway_address": s.gw.Address}})
c.Assert(err, check.IsNil)
-}
-func (s *ContainerGatewaySuite) SetUpTest(c *check.C) {
s.cluster.Containers.ShellAccess.Admin = true
s.cluster.Containers.ShellAccess.User = true
- _, err := arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID)
+ _, err = arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID)
c.Check(err, check.IsNil)
}
_, err = s.localdb.ContainerSSH(ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID})
c.Check(err, check.ErrorMatches, `.* 404 .*`)
}
+
+func (s *ContainerGatewaySuite) TestCreateTunnel(c *check.C) {
+ // no AuthSecret
+ conn, err := s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+ UUID: s.ctrUUID,
+ })
+ c.Check(err, check.ErrorMatches, `authentication error`)
+ c.Check(conn.Conn, check.IsNil)
+
+ // bogus AuthSecret
+ conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+ UUID: s.ctrUUID,
+ AuthSecret: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
+ })
+ c.Check(err, check.ErrorMatches, `authentication error`)
+ c.Check(conn.Conn, check.IsNil)
+
+ // good AuthSecret
+ conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+ UUID: s.ctrUUID,
+ AuthSecret: s.gw.AuthSecret,
+ })
+ c.Check(err, check.IsNil)
+ c.Check(conn.Conn, check.NotNil)
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelWithProxyOK(c *check.C) {
+ forceProxyForTest = true
+ defer func() { forceProxyForTest = false }()
+ s.cluster.Services.Controller.InternalURLs[*forceInternalURLForTest] = arvados.ServiceInstance{}
+ defer delete(s.cluster.Services.Controller.InternalURLs, *forceInternalURLForTest)
+ s.testConnectThroughTunnel(c, "")
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelWithProxyError(c *check.C) {
+ forceProxyForTest = true
+ defer func() { forceProxyForTest = false }()
+ // forceInternalURLForTest shouldn't be used because it isn't
+ // listed in s.cluster.Services.Controller.InternalURLs
+ s.testConnectThroughTunnel(c, `.*tunnel endpoint is invalid.*`)
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelNoProxyOK(c *check.C) {
+ s.testConnectThroughTunnel(c, "")
+}
+
+func (s *ContainerGatewaySuite) testConnectThroughTunnel(c *check.C, expectErrorMatch string) {
+ rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
+ // Until the tunnel starts up, set gateway_address to a value
+ // that can't work. We want to ensure the only way we can
+ // reach the gateway is through the tunnel.
+ gwaddr := "127.0.0.1:0"
+ tungw := &crunchrun.Gateway{
+ ContainerUUID: s.ctrUUID,
+ AuthSecret: s.gw.AuthSecret,
+ Log: ctxlog.TestLogger(c),
+ Target: crunchrun.GatewayTargetStub{},
+ ArvadosClient: s.gw.ArvadosClient,
+ UpdateTunnelURL: func(url string) {
+ c.Logf("UpdateTunnelURL(%q)", url)
+ gwaddr = "tunnel " + url
+ s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+ UUID: s.ctrUUID,
+ Attrs: map[string]interface{}{
+ "gateway_address": gwaddr}})
+ },
+ }
+ c.Assert(tungw.Start(), check.IsNil)
+
+ // We didn't supply an external hostname in the Address field,
+ // so Start() should assign a local address.
+ host, _, err := net.SplitHostPort(tungw.Address)
+ c.Assert(err, check.IsNil)
+ c.Check(host, check.Equals, "127.0.0.1")
+
+ _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+ UUID: s.ctrUUID,
+ Attrs: map[string]interface{}{
+ "state": arvados.ContainerStateRunning,
+ "gateway_address": gwaddr}})
+ c.Assert(err, check.IsNil)
+
+ for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); time.Sleep(time.Second / 2) {
+ ctr, err := s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID})
+ c.Assert(err, check.IsNil)
+ c.Check(ctr.InteractiveSessionStarted, check.Equals, false)
+ c.Logf("ctr.GatewayAddress == %s", ctr.GatewayAddress)
+ if strings.HasPrefix(ctr.GatewayAddress, "tunnel ") {
+ break
+ }
+ }
+
+ c.Log("connecting to gateway through tunnel")
+ arpc := rpc.NewConn("", &url.URL{Scheme: "https", Host: s.gw.ArvadosClient.APIHost}, true, rpc.PassthroughTokenProvider)
+ sshconn, err := arpc.ContainerSSH(s.ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID})
+ if expectErrorMatch != "" {
+ c.Check(err, check.ErrorMatches, expectErrorMatch)
+ return
+ }
+ c.Assert(err, check.IsNil)
+ c.Assert(sshconn.Conn, check.NotNil)
+ defer sshconn.Conn.Close()
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+
+ // Receive text banner
+ buf := make([]byte, 12)
+ _, err := io.ReadFull(sshconn.Conn, buf)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, "SSH-2.0-Go\r\n")
+
+ // Send text banner
+ _, err = sshconn.Conn.Write([]byte("SSH-2.0-Fake\r\n"))
+ c.Check(err, check.IsNil)
+
+ // Receive binary
+ _, err = io.ReadFull(sshconn.Conn, buf[:4])
+ c.Check(err, check.IsNil)
+
+ // If we can get this far into an SSH handshake...
+ c.Logf("was able to read %x -- success, tunnel is working", buf[:4])
+ }()
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ c.Fail()
+ }
+ ctr, err := s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID})
+ c.Check(err, check.IsNil)
+ c.Check(ctr.InteractiveSessionStarted, check.Equals, true)
+}
hdrOut[k] = v
}
}
- xff := reqIn.RemoteAddr
- if xffIn := reqIn.Header.Get("X-Forwarded-For"); xffIn != "" {
- xff = xffIn + "," + xff
+ xff := ""
+ for _, xffIn := range reqIn.Header["X-Forwarded-For"] {
+ if xffIn != "" {
+ xff += xffIn + ","
+ }
}
+ xff += reqIn.RemoteAddr
hdrOut.Set("X-Forwarded-For", xff)
if hdrOut.Get("X-Forwarded-Proto") == "" {
hdrOut.Set("X-Forwarded-Proto", reqIn.URL.Scheme)
"bypass_federation": true,
"recursive": true,
"exclude_home_project": true,
+ "no_forward": true,
}
func stringToBool(s string) bool {
return rtr.backend.ContainerSSH(ctx, *opts.(*arvados.ContainerSSHOptions))
},
},
+ {
+ // arvados-client built before commit
+ // bdc29d3129f6d75aa9ce0a24ffb849a272b06f08
+ // used GET with params in headers instead of
+ // POST form
+ arvados.APIEndpoint{"GET", "arvados/v1/connect/{uuid}/ssh", ""},
+ func() interface{} { return &arvados.ContainerSSHOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return nil, httpError(http.StatusGone, fmt.Errorf("API endpoint is obsolete -- please upgrade your arvados-client program"))
+ },
+ },
+ {
+ arvados.EndpointContainerGatewayTunnel,
+ func() interface{} { return &arvados.ContainerGatewayTunnelOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.ContainerGatewayTunnel(ctx, *opts.(*arvados.ContainerGatewayTunnelOptions))
+ },
+ },
{
arvados.EndpointGroupCreate,
func() interface{} { return &arvados.CreateOptions{} },
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
)
// ContainerSSH returns a connection to the out-of-band SSH server for
// a running container. If the returned error is nil, the caller is
// responsible for closing sshconn.Conn.
-func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ConnectionResponse, err error) {
+ u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
+ if err != nil {
+ err = fmt.Errorf("url.Parse: %w", err)
+ return
+ }
+ return conn.socket(ctx, u, "ssh", url.Values{
+ "detach_keys": {options.DetachKeys},
+ "login_username": {options.LoginUsername},
+ "no_forward": {fmt.Sprintf("%v", options.NoForward)},
+ })
+}
+
+// ContainerGatewayTunnel returns a connection to a yamux session on
+// the controller. The caller should connect the returned resp.Conn to
+// a client-side yamux session.
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (tunnelconn arvados.ConnectionResponse, err error) {
+ u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", options.UUID, -1))
+ if err != nil {
+ err = fmt.Errorf("url.Parse: %w", err)
+ return
+ }
+ return conn.socket(ctx, u, "tunnel", url.Values{
+ "auth_secret": {options.AuthSecret},
+ })
+}
+
+// socket sets up a socket using the specified API endpoint and
+// upgrade header.
+func (conn *Conn) socket(ctx context.Context, u *url.URL, upgradeHeader string, postform url.Values) (connresp arvados.ConnectionResponse, err error) {
addr := conn.baseURL.Host
if strings.Index(addr, ":") < 1 || (strings.Contains(addr, "::") && addr[0] != '[') {
// hostname or ::1 or 1::1
}
netconn, err := tls.Dial("tcp", addr, &tls.Config{InsecureSkipVerify: insecure})
if err != nil {
- err = fmt.Errorf("tls.Dial: %w", err)
- return
+ return connresp, fmt.Errorf("tls.Dial: %w", err)
}
defer func() {
if err != nil {
bufr := bufio.NewReader(netconn)
bufw := bufio.NewWriter(netconn)
- u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
- if err != nil {
- err = fmt.Errorf("tls.Dial: %w", err)
- return
- }
- u.RawQuery = url.Values{
- "detach_keys": {options.DetachKeys},
- "login_username": {options.LoginUsername},
- }.Encode()
tokens, err := conn.tokenProvider(ctx)
if err != nil {
- return
+ return connresp, err
} else if len(tokens) < 1 {
- err = httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
- return
+ return connresp, httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
}
- bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
+ postdata := postform.Encode()
+ bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n")
bufw.WriteString("Authorization: Bearer " + tokens[0] + "\r\n")
bufw.WriteString("Host: " + u.Host + "\r\n")
- bufw.WriteString("Upgrade: ssh\r\n")
+ bufw.WriteString("Upgrade: " + upgradeHeader + "\r\n")
+ bufw.WriteString("Content-Type: application/x-www-form-urlencoded\r\n")
+ fmt.Fprintf(bufw, "Content-Length: %d\r\n", len(postdata))
bufw.WriteString("\r\n")
+ bufw.WriteString(postdata)
bufw.Flush()
- resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
+ resp, err := http.ReadResponse(bufr, &http.Request{Method: "POST"})
if err != nil {
- err = fmt.Errorf("http.ReadResponse: %w", err)
- return
+ return connresp, fmt.Errorf("http.ReadResponse: %w", err)
}
+ defer resp.Body.Close()
if resp.StatusCode != http.StatusSwitchingProtocols {
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
+ ctxlog.FromContext(ctx).Infof("rpc.Conn.socket: server %s did not switch protocols, got status %s", u.String(), resp.Status)
+ body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 10000))
var message string
var errDoc httpserver.ErrorResponse
if err := json.Unmarshal(body, &errDoc); err == nil {
} else {
message = fmt.Sprintf("%q", body)
}
- err = fmt.Errorf("server did not provide a tunnel: %s (HTTP %d)", message, resp.StatusCode)
- return
+ return connresp, fmt.Errorf("server did not provide a tunnel: %s: %s", resp.Status, message)
}
- if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
+ if strings.ToLower(resp.Header.Get("Upgrade")) != upgradeHeader ||
strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
- err = fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection"))
- return
+ return connresp, fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection"))
}
- sshconn.Conn = netconn
- sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
- return
+ connresp.Conn = netconn
+ connresp.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
+ connresp.Header = resp.Header
+ return connresp, nil
}
func (conn *Conn) ContainerRequestCreate(ctx context.Context, options arvados.CreateOptions) (arvados.ContainerRequest, error) {
"io"
"net"
"net/http"
+ "net/url"
"os"
"os/exec"
"sync"
"syscall"
+ "time"
+ "git.arvados.org/arvados.git/lib/controller/rpc"
"git.arvados.org/arvados.git/lib/selfsigned"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/auth"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
"github.com/creack/pty"
"github.com/google/shlex"
+ "github.com/hashicorp/yamux"
"golang.org/x/crypto/ssh"
"golang.org/x/net/context"
)
type Gateway struct {
ContainerUUID string
- Address string // listen host:port; if port=0, Start() will change it to the selected port
- AuthSecret string
- Target GatewayTarget
- Log interface {
+ // Caller should set Address to "", or "host:0" or "host:port"
+ // where host is a known external IP address; port is a
+ // desired port number to listen on; and ":0" chooses an
+ // available dynamic port.
+ //
+ // If Address is "", Start() listens only on the loopback
+ // interface (and changes Address to "127.0.0.1:port").
+ // Otherwise it listens on all interfaces.
+ //
+ // If Address is "host:0", Start() updates Address to
+ // "host:port".
+ Address string
+ AuthSecret string
+ Target GatewayTarget
+ Log interface {
Printf(fmt string, args ...interface{})
}
+ // If non-nil, set up a ContainerGatewayTunnel, so that the
+ // controller can connect to us even if our external IP
+ // address is unknown or not routable from controller.
+ ArvadosClient *arvados.Client
+
+ // When a tunnel is connected or reconnected, this func (if
+ // not nil) will be called with the InternalURL of the
+ // controller process at the other end of the tunnel.
+ UpdateTunnelURL func(url string)
sshConfig ssh.ServerConfig
requestAuth string
// from arvados-controller, and PORT is either the desired
// port where we should run our gateway server, or "0" if we
// should choose an available port.
- host, port, err := net.SplitHostPort(gw.Address)
+ extAddr := gw.Address
+ // Generally we can't know which local interface corresponds
+ // to an externally reachable IP address, so if we expect to
+ // be reachable by external hosts, we listen on all
+ // interfaces.
+ listenHost := ""
+ if extAddr == "" {
+ // If the dispatcher doesn't tell us our external IP
+ // address, controller will only be able to connect
+ // through the tunnel (see runTunnel), so our gateway
+ // server only needs to listen on the loopback
+ // interface.
+ extAddr = "127.0.0.1:0"
+ listenHost = "127.0.0.1"
+ }
+ extHost, extPort, err := net.SplitHostPort(extAddr)
if err != nil {
return err
}
Certificates: []tls.Certificate{cert},
},
},
- Addr: ":" + port,
+ Addr: net.JoinHostPort(listenHost, extPort),
}
err = srv.Start()
if err != nil {
return err
}
- // Get the port number we are listening on (the port might be
+ go func() {
+ err := srv.Wait()
+ gw.Log.Printf("gateway server stopped: %s", err)
+ }()
+ // Get the port number we are listening on (extPort might be
// "0" or a port name, in which case this will be different).
- _, port, err = net.SplitHostPort(srv.Addr)
+ _, listenPort, err := net.SplitHostPort(srv.Addr)
if err != nil {
return err
}
- // When changing state to Running, we will set
- // gateway_address to "HOST:PORT" where HOST is our
- // external hostname/IP as provided by arvados-dispatch-cloud,
- // and PORT is the port number we ended up listening on.
- gw.Address = net.JoinHostPort(host, port)
+ // When changing state to Running, the caller will want to set
+ // gateway_address to a "HOST:PORT" that, if controller
+ // connects to it, will reach this gateway server.
+ //
+ // The most likely thing to work is: HOST is our external
+ // hostname/IP as provided by the caller
+ // (arvados-dispatch-cloud) or 127.0.0.1 to indicate
+ // non-tunnel connections aren't available; and PORT is the
+ // port number we are listening on.
+ gw.Address = net.JoinHostPort(extHost, listenPort)
+ gw.Log.Printf("gateway server listening at %s", gw.Address)
+ if gw.ArvadosClient != nil {
+ go gw.maintainTunnel(gw.Address)
+ }
return nil
}
+func (gw *Gateway) maintainTunnel(addr string) {
+ for ; ; time.Sleep(5 * time.Second) {
+ err := gw.runTunnel(addr)
+ gw.Log.Printf("runTunnel: %s", err)
+ }
+}
+
+// runTunnel connects to controller and sets up a tunnel through
+// which controller can connect to the gateway server at the given
+// addr.
+func (gw *Gateway) runTunnel(addr string) error {
+ ctx := auth.NewContext(context.Background(), auth.NewCredentials(gw.ArvadosClient.AuthToken))
+ arpc := rpc.NewConn("", &url.URL{Scheme: "https", Host: gw.ArvadosClient.APIHost}, gw.ArvadosClient.Insecure, rpc.PassthroughTokenProvider)
+ tun, err := arpc.ContainerGatewayTunnel(ctx, arvados.ContainerGatewayTunnelOptions{
+ UUID: gw.ContainerUUID,
+ AuthSecret: gw.AuthSecret,
+ })
+ if err != nil {
+ return fmt.Errorf("error creating gateway tunnel: %s", err)
+ }
+ mux, err := yamux.Client(tun.Conn, nil)
+ if err != nil {
+ return fmt.Errorf("error setting up mux client end: %s", err)
+ }
+ if url := tun.Header.Get("X-Arvados-Internal-Url"); url != "" && gw.UpdateTunnelURL != nil {
+ gw.UpdateTunnelURL(url)
+ }
+ for {
+ muxconn, err := mux.AcceptStream()
+ if err != nil {
+ return err
+ }
+ gw.Log.Printf("tunnel connection %d started", muxconn.StreamID())
+ go func() {
+ defer muxconn.Close()
+ gwconn, err := net.Dial("tcp", addr)
+ if err != nil {
+ gw.Log.Printf("tunnel connection %d: error connecting to %s: %s", muxconn.StreamID(), addr, err)
+ return
+ }
+ defer gwconn.Close()
+ var wg sync.WaitGroup
+ wg.Add(2)
+ go func() {
+ defer wg.Done()
+ _, err := io.Copy(gwconn, muxconn)
+ if err != nil {
+ gw.Log.Printf("tunnel connection %d: mux end: %s", muxconn.StreamID(), err)
+ }
+ gwconn.Close()
+ }()
+ go func() {
+ defer wg.Done()
+ _, err := io.Copy(muxconn, gwconn)
+ if err != nil {
+ gw.Log.Printf("tunnel connection %d: gateway end: %s", muxconn.StreamID(), err)
+ }
+ muxconn.Close()
+ }()
+ wg.Wait()
+ gw.Log.Printf("tunnel connection %d finished", muxconn.StreamID())
+ }()
+ }
+}
+
// handleSSH connects to an SSH server that allows the caller to run
// interactive commands as root (or any other desired user) inside the
// container. The tunnel itself can only be created by an
// In future we'll handle browser traffic too, but for now the
// only traffic we expect is an SSH tunnel from
// (*lib/controller/localdb.Conn)ContainerSSH()
- if req.Method != "GET" || req.Header.Get("Upgrade") != "ssh" {
+ if req.Method != "POST" || req.Header.Get("Upgrade") != "ssh" {
http.Error(w, "path not found", http.StatusNotFound)
return
}
- if want := req.Header.Get("X-Arvados-Target-Uuid"); want != gw.ContainerUUID {
+ req.ParseForm()
+ if want := req.Form.Get("uuid"); want != gw.ContainerUUID {
http.Error(w, fmt.Sprintf("misdirected request: meant for %q but received by crunch-run %q", want, gw.ContainerUUID), http.StatusBadGateway)
return
}
http.Error(w, "bad X-Arvados-Authorization header", http.StatusUnauthorized)
return
}
- detachKeys := req.Header.Get("X-Arvados-Detach-Keys")
- username := req.Header.Get("X-Arvados-Login-Username")
+ detachKeys := req.Form.Get("detach_keys")
+ username := req.Form.Get("login_username")
if username == "" {
username = "root"
}
ctx := req.Context()
conn, newchans, reqs, err := ssh.NewServerConn(netconn, &gw.sshConfig)
- if err != nil {
+ if err == io.EOF {
+ return
+ } else if err != nil {
gw.Log.Printf("ssh.NewServerConn: %s", err)
return
}
func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, detachKeys, username string) {
ch, reqs, err := newch.Accept()
if err != nil {
- gw.Log.Printf("accept session channel: %s", err)
+ gw.Log.Printf("error accepting session channel: %s", err)
return
}
+ defer ch.Close()
+
var pty0, tty0 *os.File
// Where to send errors/messages for the client to see
logw := io.Writer(ch.Stderr())
eol := "\n"
// Env vars to add to child process
termEnv := []string(nil)
- for req := range reqs {
+
+ started := 0
+ wantClose := make(chan struct{})
+ for {
+ var req *ssh.Request
+ select {
+ case r, ok := <-reqs:
+ if !ok {
+ return
+ }
+ req = r
+ case <-wantClose:
+ return
+ }
ok := false
switch req.Type {
case "shell", "exec":
+ if started++; started != 1 {
+ // RFC 4254 6.5: "Only one of these
+ // requests can succeed per channel."
+ break
+ }
ok = true
var payload struct {
Command string
}
defer func() {
ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
- ch.Close()
+ close(wantClose)
}()
cmd, err := gw.Target.InjectCommand(ctx, detachKeys, username, tty0 != nil, execargs)
resp.Status = 1
return
}
- cmd.Stdin = ch
- cmd.Stdout = ch
- cmd.Stderr = ch.Stderr()
if tty0 != nil {
cmd.Stdin = tty0
cmd.Stdout = tty0
cmd.Stderr = tty0
- var wg sync.WaitGroup
- defer wg.Wait()
- wg.Add(2)
- go func() { io.Copy(ch, pty0); wg.Done() }()
- go func() { io.Copy(pty0, ch); wg.Done() }()
+ go io.Copy(ch, pty0)
+ go io.Copy(pty0, ch)
// Send our own debug messages to tty as well.
logw = tty0
+ } else {
+ // StdinPipe may seem
+ // superfluous here, but it's
+ // not: it causes cmd.Run() to
+ // return when the subprocess
+ // exits. Without it, Run()
+ // waits for stdin to close,
+ // which causes "ssh ... echo
+ // ok" (with the client's
+ // stdin connected to a
+ // terminal or something) to
+ // hang.
+ stdin, err := cmd.StdinPipe()
+ if err != nil {
+ fmt.Fprintln(ch.Stderr(), err)
+ ch.CloseWrite()
+ resp.Status = 1
+ return
+ }
+ go func() {
+ io.Copy(stdin, ch)
+ stdin.Close()
+ }()
+ cmd.Stdout = ch
+ cmd.Stderr = ch.Stderr()
}
cmd.SysProcAttr = &syscall.SysProcAttr{
Setctty: tty0 != nil,
// would be a gaping security
// hole).
default:
- // fmt.Fprintf(logw, "declining %q req"+eol, req.Type)
+ // fmt.Fprintf(logw, "declined request %q on ssh channel"+eol, req.Type)
}
if req.WantReply {
req.Reply(ok, nil)
runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+ version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
ignoreDetachFlag := false
if len(args) > 0 && args[0] == "-no-detach" {
if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
return code
+ } else if *version {
+ fmt.Fprintln(stdout, prog, cmd.Version.String())
+ return 0
} else if !*list && flags.NArg() != 1 {
fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
return 2
// not safe to run a gateway service without an auth
// secret
cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
- } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
- // dispatcher did not tell us which external IP
- // address to advertise --> no gateway service
- cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
} else {
+ gwListen := os.Getenv("GatewayAddress")
cr.gateway = Gateway{
Address: gwListen,
AuthSecret: gwAuthSecret,
Target: cr.executor,
Log: cr.CrunchLog,
}
+ if gwListen == "" {
+ // Direct connection won't work, so we use the
+ // gateway_address field to indicate the
+ // internalURL of the controller process that
+ // has the current tunnel connection.
+ cr.gateway.ArvadosClient = cr.dispatcherClient
+ cr.gateway.UpdateTunnelURL = func(url string) {
+ cr.gateway.Address = "tunnel " + url
+ cr.DispatcherArvClient.Update("containers", containerUUID,
+ arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+ }
+ }
err = cr.gateway.Start()
if err != nil {
log.Printf("error starting gateway server: %s", err)
c.Check(s.logFiles["crunch-run.txt"], Not(Matches), `(?ms).* at http://169\.254\..*`)
c.Check(s.logFiles["stderr.txt"], Matches, `(?ms).*ARVADOS_KEEP_SERVICES=http://[\d\.]{7,}:\d+\n.*`)
}
+}
+func (s *integrationSuite) TestRunTrivialContainerWithNoLocalKeepstore(c *C) {
// Check that (1) config is loaded from $ARVADOS_CONFIG when
// not provided on stdin and (2) if a local keepstore is not
// started, crunch-run.txt explains why not.
s.SetUpTest(c)
s.stdin.Reset()
s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because KeepBuffers=0 in config\n.*`)
+
+ s.SetUpTest(c)
+ s.args = []string{"-config", c.MkDir() + "/config.yaml"}
+ s.stdin.Reset()
+ buf, err := ioutil.ReadFile(os.Getenv("ARVADOS_CONFIG"))
+ c.Assert(err, IsNil)
+ err = ioutil.WriteFile(s.args[1], bytes.Replace(buf, []byte("LocalKeepBlobBuffersPerVCPU: 0"), []byte("LocalKeepBlobBuffersPerVCPU: 1"), -1), 0666)
+ c.Assert(err, IsNil)
+ s.testRunTrivialContainer(c)
c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-00000000000000\d\) uses AccessViaHosts\n.*`)
// Check that config read errors are logged
s.SetUpTest(c)
s.args = []string{"-config", c.MkDir() + "/config-unreadable.yaml"}
s.stdin.Reset()
- err := ioutil.WriteFile(s.args[1], []byte{}, 0)
+ err = ioutil.WriteFile(s.args[1], []byte{}, 0)
c.Check(err, IsNil)
s.testRunTrivialContainer(c)
c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* permission denied\n.*`)
import (
"context"
+ "crypto/hmac"
+ "crypto/sha256"
"errors"
"fmt"
"math"
var crArgs []string
crArgs = append(crArgs, crunchRunCommand...)
crArgs = append(crArgs, container.UUID)
- crScript := execScript(crArgs)
+
+ h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken))
+ fmt.Fprint(h, container.UUID)
+ authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+ crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret})
bsubArgs, err := disp.bsubArgs(container)
if err != nil {
}
}
-func execScript(args []string) []byte {
- s := "#!/bin/sh\nexec"
+func execScript(args []string, env map[string]string) []byte {
+ s := "#!/bin/sh\n"
+ for k, v := range env {
+ s += k + `='`
+ s += strings.Replace(v, `'`, `'\''`, -1)
+ s += `' `
+ }
+ s += `exec`
for _, w := range args {
s += ` '`
s += strings.Replace(w, `'`, `'\''`, -1)
"encoding/json"
"io"
"net"
+ "net/http"
"github.com/sirupsen/logrus"
)
EndpointContainerDelete = APIEndpoint{"DELETE", "arvados/v1/containers/{uuid}", ""}
EndpointContainerLock = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/lock", ""}
EndpointContainerUnlock = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/unlock", ""}
- EndpointContainerSSH = APIEndpoint{"GET", "arvados/v1/connect/{uuid}/ssh", ""} // move to /containers after #17014 fixes routing
+ EndpointContainerSSH = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/ssh", ""} // move to /containers after #17014 fixes routing
+ EndpointContainerGatewayTunnel = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/gateway_tunnel", ""} // move to /containers after #17014 fixes routing
EndpointContainerRequestCreate = APIEndpoint{"POST", "arvados/v1/container_requests", "container_request"}
EndpointContainerRequestUpdate = APIEndpoint{"PATCH", "arvados/v1/container_requests/{uuid}", "container_request"}
EndpointContainerRequestGet = APIEndpoint{"GET", "arvados/v1/container_requests/{uuid}", ""}
UUID string `json:"uuid"`
DetachKeys string `json:"detach_keys"`
LoginUsername string `json:"login_username"`
+ NoForward bool `json:"no_forward"`
}
-type ContainerSSHConnection struct {
+type ConnectionResponse struct {
Conn net.Conn `json:"-"`
Bufrw *bufio.ReadWriter `json:"-"`
Logger logrus.FieldLogger `json:"-"`
+ Header http.Header `json:"-"`
+}
+
+type ContainerGatewayTunnelOptions struct {
+ UUID string `json:"uuid"`
+ AuthSecret string `json:"auth_secret"`
}
type GetOptions struct {
ContainerDelete(ctx context.Context, options DeleteOptions) (Container, error)
ContainerLock(ctx context.Context, options GetOptions) (Container, error)
ContainerUnlock(ctx context.Context, options GetOptions) (Container, error)
- ContainerSSH(ctx context.Context, options ContainerSSHOptions) (ContainerSSHConnection, error)
+ ContainerSSH(ctx context.Context, options ContainerSSHOptions) (ConnectionResponse, error)
+ ContainerGatewayTunnel(ctx context.Context, options ContainerGatewayTunnelOptions) (ConnectionResponse, error)
ContainerRequestCreate(ctx context.Context, options CreateOptions) (ContainerRequest, error)
ContainerRequestUpdate(ctx context.Context, options UpdateOptions) (ContainerRequest, error)
ContainerRequestGet(ctx context.Context, options GetOptions) (ContainerRequest, error)
"github.com/sirupsen/logrus"
)
-func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ defer cresp.Conn.Close()
hj, ok := w.(http.Hijacker)
if !ok {
http.Error(w, "ResponseWriter does not support connection upgrade", http.StatusInternalServerError)
return
}
w.Header().Set("Connection", "upgrade")
- w.Header().Set("Upgrade", "ssh")
+ for k, v := range cresp.Header {
+ w.Header()[k] = v
+ }
w.WriteHeader(http.StatusSwitchingProtocols)
conn, bufrw, err := hj.Hijack()
if err != nil {
defer conn.Close()
var bytesIn, bytesOut int64
+ ctx, cancel := context.WithCancel(req.Context())
var wg sync.WaitGroup
- ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
- n, err := io.CopyN(conn, sshconn.Bufrw, int64(sshconn.Bufrw.Reader.Buffered()))
+ n, err := io.CopyN(conn, cresp.Bufrw, int64(cresp.Bufrw.Reader.Buffered()))
bytesOut += n
if err == nil {
- n, err = io.Copy(conn, sshconn.Conn)
+ n, err = io.Copy(conn, cresp.Conn)
bytesOut += n
}
if err != nil {
- ctxlog.FromContext(req.Context()).WithError(err).Error("error copying downstream")
+ ctxlog.FromContext(ctx).WithError(err).Error("error copying downstream")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
- n, err := io.CopyN(sshconn.Conn, bufrw, int64(bufrw.Reader.Buffered()))
+ n, err := io.CopyN(cresp.Conn, bufrw, int64(bufrw.Reader.Buffered()))
bytesIn += n
if err == nil {
- n, err = io.Copy(sshconn.Conn, conn)
+ n, err = io.Copy(cresp.Conn, conn)
bytesIn += n
}
if err != nil {
- ctxlog.FromContext(req.Context()).WithError(err).Error("error copying upstream")
+ ctxlog.FromContext(ctx).WithError(err).Error("error copying upstream")
}
}()
<-ctx.Done()
- if sshconn.Logger != nil {
- go func() {
- wg.Wait()
- sshconn.Logger.WithFields(logrus.Fields{
+ go func() {
+ // Wait for both io.Copy goroutines to finish and increment
+ // their byte counters.
+ wg.Wait()
+ if cresp.Logger != nil {
+ cresp.Logger.WithFields(logrus.Fields{
"bytesIn": bytesIn,
"bytesOut": bytesOut,
}).Info("closed connection")
- }()
- }
+ }
+ }()
}
as.appendCall(ctx, as.ContainerUnlock, options)
return arvados.Container{}, as.Error
}
-func (as *APIStub) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ContainerSSHConnection, error) {
+func (as *APIStub) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ConnectionResponse, error) {
as.appendCall(ctx, as.ContainerSSH, options)
- return arvados.ContainerSSHConnection{}, as.Error
+ return arvados.ConnectionResponse{}, as.Error
+}
+func (as *APIStub) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) {
+ as.appendCall(ctx, as.ContainerGatewayTunnel, options)
+ return arvados.ConnectionResponse{}, as.Error
}
func (as *APIStub) ContainerRequestCreate(ctx context.Context, options arvados.CreateOptions) (arvados.ContainerRequest, error) {
as.appendCall(ctx, as.ContainerRequestCreate, options)
"JobsAPI": {
"GitInternalDir": os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'internal.git'),
},
+ "LocalKeepBlobBuffersPerVCPU": 0,
"SupportedDockerImageFormats": {"v1": {}},
"ShellAccess": {
"Admin": True,
permitted.push :priority
when Running
- permitted.push :priority, :output_properties, *progress_attrs
+ permitted.push :priority, :output_properties, :gateway_address, *progress_attrs
if self.state_changed?
- permitted.push :started_at, :gateway_address
+ permitted.push :started_at
end
if !self.interactive_session_started_was
permitted.push :interactive_session_started
Thread.current[:user] = auth.user
end
+ assert c.update_attributes(gateway_address: "127.0.0.1:9")
assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
assert c.update_attributes(runtime_status: {'warning' => 'something happened'})
assert c.update_attributes(progress: 0.5)
import (
"context"
+ "crypto/hmac"
+ "crypto/sha256"
"fmt"
"log"
"math"
crArgs := append([]string(nil), crunchRunCommand...)
crArgs = append(crArgs, "--runtime-engine="+disp.cluster.Containers.RuntimeEngine)
crArgs = append(crArgs, container.UUID)
- crScript := strings.NewReader(execScript(crArgs))
+
+ h := hmac.New(sha256.New, []byte(disp.cluster.SystemRootToken))
+ fmt.Fprint(h, container.UUID)
+ authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+ crScript := strings.NewReader(execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret}))
sbArgs, err := disp.sbatchArgs(container)
if err != nil {
"strings"
)
-func execScript(args []string) string {
- s := "#!/bin/sh\nexec"
+func execScript(args []string, env map[string]string) string {
+ s := "#!/bin/sh\n"
+ for k, v := range env {
+ s += k + `='`
+ s += strings.Replace(v, `'`, `'\''`, -1)
+ s += `' `
+ }
+ s += `exec`
for _, w := range args {
s += ` '`
s += strings.Replace(w, `'`, `'\''`, -1)
{[]string{`foo"`, "'waz 'qux\n"}, `exec 'foo"' ''\''waz '\''qux` + "\n" + `'`},
} {
c.Logf("%+v -> %+v", test.args, test.script)
- c.Check(execScript(test.args), Equals, "#!/bin/sh\n"+test.script+"\n")
+ c.Check(execScript(test.args, nil), Equals, "#!/bin/sh\n"+test.script+"\n")
}
+ c.Check(execScript([]string{"sh", "-c", "echo $foo"}, map[string]string{"foo": "b'ar"}), Equals, "#!/bin/sh\nfoo='b'\\''ar' exec 'sh' '-c' 'echo $foo'\n")
}