X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/31a270c68c24b7de994655ee788478a64b6bdfb7..4e1e7f762ff1acd13b18efed5974b32833a467e2:/lib/crunchrun/container_gateway.go diff --git a/lib/crunchrun/container_gateway.go b/lib/crunchrun/container_gateway.go index 6fae73798c..30f8957a2d 100644 --- a/lib/crunchrun/container_gateway.go +++ b/lib/crunchrun/container_gateway.go @@ -5,6 +5,7 @@ package crunchrun import ( + "context" "crypto/hmac" "crypto/rand" "crypto/rsa" @@ -17,12 +18,14 @@ import ( "net/url" "os" "os/exec" + "strings" "sync" "syscall" "time" "git.arvados.org/arvados.git/lib/controller/rpc" "git.arvados.org/arvados.git/lib/selfsigned" + "git.arvados.org/arvados.git/lib/webdavfs" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/auth" "git.arvados.org/arvados.git/sdk/go/ctxlog" @@ -31,7 +34,7 @@ import ( "github.com/google/shlex" "github.com/hashicorp/yamux" "golang.org/x/crypto/ssh" - "golang.org/x/net/context" + "golang.org/x/net/webdav" ) type GatewayTarget interface { @@ -78,6 +81,10 @@ type Gateway struct { // controller process at the other end of the tunnel. UpdateTunnelURL func(url string) + // Source for serving WebDAV requests with + // X-Webdav-Source: /log + LogCollection arvados.CollectionFileSystem + sshConfig ssh.ServerConfig requestAuth string respondAuth string @@ -157,7 +164,7 @@ func (gw *Gateway) Start() error { srv := &httpserver.Server{ Server: http.Server{ - Handler: http.HandlerFunc(gw.handleSSH), + Handler: gw, TLSConfig: &tls.Config{ Certificates: []tls.Certificate{cert}, }, @@ -242,18 +249,16 @@ func (gw *Gateway) runTunnel(addr string) error { defer wg.Done() _, err := io.Copy(gwconn, muxconn) if err != nil { - gw.Log.Printf("tunnel connection %d: tunnel: %s", muxconn.StreamID(), err) + gw.Log.Printf("tunnel connection %d: mux end: %s", muxconn.StreamID(), err) } - muxconn.Close() gwconn.Close() }() go func() { defer wg.Done() _, err := io.Copy(muxconn, gwconn) if err != nil { - gw.Log.Printf("tunnel connection %d: gateway: %s", muxconn.StreamID(), err) + gw.Log.Printf("tunnel connection %d: gateway end: %s", muxconn.StreamID(), err) } - gwconn.Close() muxconn.Close() }() wg.Wait() @@ -262,6 +267,75 @@ func (gw *Gateway) runTunnel(addr string) error { } } +var webdavMethod = map[string]bool{ + "GET": true, + "OPTIONS": true, + "PROPFIND": true, +} + +func (gw *Gateway) ServeHTTP(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Vary", "X-Arvados-Authorization, X-Arvados-Container-Gateway-Uuid, X-Webdav-Prefix, X-Webdav-Source") + reqUUID := req.Header.Get("X-Arvados-Container-Gateway-Uuid") + if reqUUID == "" { + // older controller versions only send UUID as query param + req.ParseForm() + reqUUID = req.Form.Get("uuid") + } + if reqUUID != gw.ContainerUUID { + http.Error(w, fmt.Sprintf("misdirected request: meant for %q but received by crunch-run %q", reqUUID, gw.ContainerUUID), http.StatusBadGateway) + return + } + if req.Header.Get("X-Arvados-Authorization") != gw.requestAuth { + http.Error(w, "bad X-Arvados-Authorization header", http.StatusUnauthorized) + return + } + w.Header().Set("X-Arvados-Authorization-Response", gw.respondAuth) + switch { + case req.Method == "POST" && req.Header.Get("Upgrade") == "ssh": + gw.handleSSH(w, req) + case req.Header.Get("X-Webdav-Source") == "/log": + if !webdavMethod[req.Method] { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + gw.handleLogsWebDAV(w, req) + default: + http.Error(w, "path not found", http.StatusNotFound) + } +} + +func (gw *Gateway) handleLogsWebDAV(w http.ResponseWriter, r *http.Request) { + prefix := r.Header.Get("X-Webdav-Prefix") + if !strings.HasPrefix(r.URL.Path, prefix) { + http.Error(w, "X-Webdav-Prefix header is not a prefix of the requested path", http.StatusBadRequest) + return + } + if gw.LogCollection == nil { + http.Error(w, "Not found", http.StatusNotFound) + return + } + wh := webdav.Handler{ + Prefix: prefix, + FileSystem: &webdavfs.FS{ + FileSystem: gw.LogCollection, + Prefix: "", + Writing: false, + AlwaysReadEOF: r.Method == "PROPFIND", + }, + LockSystem: webdavfs.NoLockSystem, + Logger: gw.webdavLogger, + } + wh.ServeHTTP(w, r) +} + +func (gw *Gateway) webdavLogger(r *http.Request, err error) { + if err != nil && !os.IsNotExist(err) { + ctxlog.FromContext(r.Context()).WithError(err).Info("error reported by webdav handler") + } else { + ctxlog.FromContext(r.Context()).WithError(err).Debug("webdav request log") + } +} + // handleSSH connects to an SSH server that allows the caller to run // interactive commands as root (or any other desired user) inside the // container. The tunnel itself can only be created by an @@ -284,22 +358,7 @@ func (gw *Gateway) runTunnel(addr string) error { // X-Arvados-Login-Username: argument to "docker exec --user": account // used to run command(s) inside the container. func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) { - // In future we'll handle browser traffic too, but for now the - // only traffic we expect is an SSH tunnel from - // (*lib/controller/localdb.Conn)ContainerSSH() - if req.Method != "POST" || req.Header.Get("Upgrade") != "ssh" { - http.Error(w, "path not found", http.StatusNotFound) - return - } req.ParseForm() - if want := req.Form.Get("uuid"); want != gw.ContainerUUID { - http.Error(w, fmt.Sprintf("misdirected request: meant for %q but received by crunch-run %q", want, gw.ContainerUUID), http.StatusBadGateway) - return - } - if req.Header.Get("X-Arvados-Authorization") != gw.requestAuth { - http.Error(w, "bad X-Arvados-Authorization header", http.StatusUnauthorized) - return - } detachKeys := req.Form.Get("detach_keys") username := req.Form.Get("login_username") if username == "" { @@ -318,7 +377,6 @@ func (gw *Gateway) handleSSH(w http.ResponseWriter, req *http.Request) { defer netconn.Close() w.Header().Set("Connection", "upgrade") w.Header().Set("Upgrade", "ssh") - w.Header().Set("X-Arvados-Authorization-Response", gw.respondAuth) netconn.Write([]byte("HTTP/1.1 101 Switching Protocols\r\n")) w.Header().Write(netconn) netconn.Write([]byte("\r\n")) @@ -402,9 +460,11 @@ func (gw *Gateway) handleDirectTCPIP(ctx context.Context, newch ssh.NewChannel) func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, detachKeys, username string) { ch, reqs, err := newch.Accept() if err != nil { - gw.Log.Printf("accept session channel: %s", err) + gw.Log.Printf("error accepting session channel: %s", err) return } + defer ch.Close() + var pty0, tty0 *os.File // Where to send errors/messages for the client to see logw := io.Writer(ch.Stderr()) @@ -413,10 +473,28 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta eol := "\n" // Env vars to add to child process termEnv := []string(nil) - for req := range reqs { + + started := 0 + wantClose := make(chan struct{}) + for { + var req *ssh.Request + select { + case r, ok := <-reqs: + if !ok { + return + } + req = r + case <-wantClose: + return + } ok := false switch req.Type { case "shell", "exec": + if started++; started != 1 { + // RFC 4254 6.5: "Only one of these + // requests can succeed per channel." + break + } ok = true var payload struct { Command string @@ -436,7 +514,7 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta } defer func() { ch.SendRequest("exit-status", false, ssh.Marshal(&resp)) - ch.Close() + close(wantClose) }() cmd, err := gw.Target.InjectCommand(ctx, detachKeys, username, tty0 != nil, execargs) @@ -446,20 +524,39 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta resp.Status = 1 return } - cmd.Stdin = ch - cmd.Stdout = ch - cmd.Stderr = ch.Stderr() if tty0 != nil { cmd.Stdin = tty0 cmd.Stdout = tty0 cmd.Stderr = tty0 - var wg sync.WaitGroup - defer wg.Wait() - wg.Add(2) - go func() { io.Copy(ch, pty0); wg.Done() }() - go func() { io.Copy(pty0, ch); wg.Done() }() + go io.Copy(ch, pty0) + go io.Copy(pty0, ch) // Send our own debug messages to tty as well. logw = tty0 + } else { + // StdinPipe may seem + // superfluous here, but it's + // not: it causes cmd.Run() to + // return when the subprocess + // exits. Without it, Run() + // waits for stdin to close, + // which causes "ssh ... echo + // ok" (with the client's + // stdin connected to a + // terminal or something) to + // hang. + stdin, err := cmd.StdinPipe() + if err != nil { + fmt.Fprintln(ch.Stderr(), err) + ch.CloseWrite() + resp.Status = 1 + return + } + go func() { + io.Copy(stdin, ch) + stdin.Close() + }() + cmd.Stdout = ch + cmd.Stderr = ch.Stderr() } cmd.SysProcAttr = &syscall.SysProcAttr{ Setctty: tty0 != nil, @@ -527,7 +624,7 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta // would be a gaping security // hole). default: - // fmt.Fprintf(logw, "declining %q req"+eol, req.Type) + // fmt.Fprintf(logw, "declined request %q on ssh channel"+eol, req.Type) } if req.WantReply { req.Reply(ok, nil)