19099: Refactor container shell backend so it's not docker-specific.
authorTom Clegg <tom@curii.com>
Thu, 12 May 2022 14:12:29 +0000 (10:12 -0400)
committerTom Clegg <tom@curii.com>
Thu, 12 May 2022 14:12:29 +0000 (10:12 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/crunchrun/container_gateway.go
lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
lib/crunchrun/docker.go
lib/crunchrun/executor.go
lib/crunchrun/executor_test.go
lib/crunchrun/singularity.go

index 2ec24bac788f5a301a823621724eff82438a5fd7..62979da21b38c69f3d83189b90988d71806d8dc5 100644 (file)
@@ -17,30 +17,33 @@ import (
        "os"
        "os/exec"
        "sync"
        "os"
        "os/exec"
        "sync"
-       "sync/atomic"
        "syscall"
        "syscall"
-       "time"
 
        "git.arvados.org/arvados.git/lib/selfsigned"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "github.com/creack/pty"
 
        "git.arvados.org/arvados.git/lib/selfsigned"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "github.com/creack/pty"
-       dockerclient "github.com/docker/docker/client"
        "github.com/google/shlex"
        "golang.org/x/crypto/ssh"
        "golang.org/x/net/context"
 )
 
        "github.com/google/shlex"
        "golang.org/x/crypto/ssh"
        "golang.org/x/net/context"
 )
 
+type GatewayTarget interface {
+       // Command that will execute cmd inside the container
+       InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, cmd []string) (*exec.Cmd, error)
+
+       // IP address inside container
+       IPAddress() (string, error)
+}
+
 type Gateway struct {
 type Gateway struct {
-       DockerContainerID *string
-       ContainerUUID     string
-       Address           string // listen host:port; if port=0, Start() will change it to the selected port
-       AuthSecret        string
-       Log               interface {
+       ContainerUUID string
+       Address       string // listen host:port; if port=0, Start() will change it to the selected port
+       AuthSecret    string
+       Target        GatewayTarget
+       Log           interface {
                Printf(fmt string, args ...interface{})
        }
                Printf(fmt string, args ...interface{})
        }
-       // return local ip address of running container, or "" if not available
-       ContainerIPAddress func() (string, error)
 
        sshConfig   ssh.ServerConfig
        requestAuth string
 
        sshConfig   ssh.ServerConfig
        requestAuth string
@@ -241,15 +244,11 @@ func (gw *Gateway) handleDirectTCPIP(ctx context.Context, newch ssh.NewChannel)
                return
        }
 
                return
        }
 
-       var dstaddr string
-       if gw.ContainerIPAddress != nil {
-               dstaddr, err = gw.ContainerIPAddress()
-               if err != nil {
-                       fmt.Fprintf(ch.Stderr(), "container has no IP address: %s\n", err)
-                       return
-               }
-       }
-       if dstaddr == "" {
+       dstaddr, err := gw.Target.IPAddress()
+       if err != nil {
+               fmt.Fprintf(ch.Stderr(), "container has no IP address: %s\n", err)
+               return
+       } else if dstaddr == "" {
                fmt.Fprintf(ch.Stderr(), "container has no IP address\n")
                return
        }
                fmt.Fprintf(ch.Stderr(), "container has no IP address\n")
                return
        }
@@ -301,12 +300,25 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
                                execargs = []string{"/bin/bash", "-login"}
                        }
                        go func() {
                                execargs = []string{"/bin/bash", "-login"}
                        }
                        go func() {
-                               cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
+                               var resp struct {
+                                       Status uint32
+                               }
+                               defer func() {
+                                       ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
+                                       ch.Close()
+                               }()
+
+                               cmd, err := gw.Target.InjectCommand(ctx, detachKeys, username, tty0 != nil, execargs)
+                               if err != nil {
+                                       fmt.Fprintln(ch.Stderr(), err)
+                                       ch.CloseWrite()
+                                       resp.Status = 1
+                                       return
+                               }
                                cmd.Stdin = ch
                                cmd.Stdout = ch
                                cmd.Stderr = ch.Stderr()
                                if tty0 != nil {
                                cmd.Stdin = ch
                                cmd.Stdout = ch
                                cmd.Stderr = ch.Stderr()
                                if tty0 != nil {
-                                       cmd.Args = append(cmd.Args, "-t")
                                        cmd.Stdin = tty0
                                        cmd.Stdout = tty0
                                        cmd.Stderr = tty0
                                        cmd.Stdin = tty0
                                        cmd.Stdout = tty0
                                        cmd.Stderr = tty0
@@ -318,17 +330,12 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
                                        // Send our own debug messages to tty as well.
                                        logw = tty0
                                }
                                        // Send our own debug messages to tty as well.
                                        logw = tty0
                                }
-                               cmd.Args = append(cmd.Args, *gw.DockerContainerID)
-                               cmd.Args = append(cmd.Args, execargs...)
                                cmd.SysProcAttr = &syscall.SysProcAttr{
                                        Setctty: tty0 != nil,
                                        Setsid:  true,
                                }
                                cmd.Env = append(os.Environ(), termEnv...)
                                cmd.SysProcAttr = &syscall.SysProcAttr{
                                        Setctty: tty0 != nil,
                                        Setsid:  true,
                                }
                                cmd.Env = append(os.Environ(), termEnv...)
-                               err := cmd.Run()
-                               var resp struct {
-                                       Status uint32
-                               }
+                               err = cmd.Run()
                                if exiterr, ok := err.(*exec.ExitError); ok {
                                        if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
                                                resp.Status = uint32(status.ExitStatus())
                                if exiterr, ok := err.(*exec.ExitError); ok {
                                        if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
                                                resp.Status = uint32(status.ExitStatus())
@@ -341,8 +348,6 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
                                if resp.Status == 0 && (err != nil || errClose != nil) {
                                        resp.Status = 1
                                }
                                if resp.Status == 0 && (err != nil || errClose != nil) {
                                        resp.Status = 1
                                }
-                               ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
-                               ch.Close()
                        }()
                case "pty-req":
                        eol = "\r\n"
                        }()
                case "pty-req":
                        eol = "\r\n"
@@ -398,31 +403,3 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta
                }
        }
 }
                }
        }
 }
-
-func dockerContainerIPAddress(containerID *string) func() (string, error) {
-       var saved atomic.Value
-       return func() (string, error) {
-               if ip, ok := saved.Load().(*string); ok {
-                       return *ip, nil
-               }
-               docker, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-               if err != nil {
-                       return "", fmt.Errorf("cannot create docker client: %s", err)
-               }
-               ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
-               defer cancel()
-               ctr, err := docker.ContainerInspect(ctx, *containerID)
-               if err != nil {
-                       return "", fmt.Errorf("cannot get docker container info: %s", err)
-               }
-               ip := ctr.NetworkSettings.IPAddress
-               if ip == "" {
-                       // TODO: try to enable networking if it wasn't
-                       // already enabled when the container was
-                       // created.
-                       return "", fmt.Errorf("container has no IP address")
-               }
-               saved.Store(&ip)
-               return ip, nil
-       }
-}
index 474fbf4ade16cb6e6b0894ea7bb47524ac0eef09..0364db78e4b8597f4f51e6a1ef22450f8b734cbf 100644 (file)
@@ -1901,14 +1901,13 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                // dispatcher did not tell us which external IP
                // address to advertise --> no gateway service
                cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
                // dispatcher did not tell us which external IP
                // address to advertise --> no gateway service
                cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
-       } else if de, ok := cr.executor.(*dockerExecutor); ok {
+       } else {
                cr.gateway = Gateway{
                cr.gateway = Gateway{
-                       Address:            gwListen,
-                       AuthSecret:         gwAuthSecret,
-                       ContainerUUID:      containerUUID,
-                       DockerContainerID:  &de.containerID,
-                       Log:                cr.CrunchLog,
-                       ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+                       Address:       gwListen,
+                       AuthSecret:    gwAuthSecret,
+                       ContainerUUID: containerUUID,
+                       Target:        cr.executor,
+                       Log:           cr.CrunchLog,
                }
                err = cr.gateway.Start()
                if err != nil {
                }
                err = cr.gateway.Start()
                if err != nil {
index 1d2c7b09fd0773466f54a0846a5507ad64627623..9e2286d6881d4cb005b331a23b4fa7b907de6727 100644 (file)
@@ -136,6 +136,10 @@ func (e *stubExecutor) Close()                          { e.closed = true }
 func (e *stubExecutor) Wait(context.Context) (int, error) {
        return <-e.exit, e.waitErr
 }
 func (e *stubExecutor) Wait(context.Context) (int, error) {
        return <-e.exit, e.waitErr
 }
+func (e *stubExecutor) InjectCommand(ctx context.Context, _, _ string, _ bool, _ []string) (*exec.Cmd, error) {
+       return nil, errors.New("unimplemented")
+}
+func (e *stubExecutor) IPAddress() (string, error) { return "", errors.New("unimplemented") }
 
 const fakeInputCollectionPDH = "ffffffffaaaaaaaa88888888eeeeeeee+1234"
 
 
 const fakeInputCollectionPDH = "ffffffffaaaaaaaa88888888eeeeeeee+1234"
 
index e62f2a39ba36e0b23d149d50f5062344a1f0aff1..dde96b08e7319715ce3822a91cc7c094aaead09f 100644 (file)
@@ -8,7 +8,9 @@ import (
        "io"
        "io/ioutil"
        "os"
        "io"
        "io/ioutil"
        "os"
+       "os/exec"
        "strings"
        "strings"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -27,6 +29,7 @@ type dockerExecutor struct {
        watchdogInterval time.Duration
        dockerclient     *dockerclient.Client
        containerID      string
        watchdogInterval time.Duration
        dockerclient     *dockerclient.Client
        containerID      string
+       savedIPAddress   atomic.Value
        doneIO           chan struct{}
        errIO            error
 }
        doneIO           chan struct{}
        errIO            error
 }
@@ -297,3 +300,34 @@ func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.
 func (e *dockerExecutor) Close() {
        e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
 }
 func (e *dockerExecutor) Close() {
        e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
 }
+
+func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
+       cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
+       if usingTTY {
+               cmd.Args = append(cmd.Args, "-t")
+       }
+       cmd.Args = append(cmd.Args, e.containerID)
+       cmd.Args = append(cmd.Args, injectcmd...)
+       return cmd, nil
+}
+
+func (e *dockerExecutor) IPAddress() (string, error) {
+       if ip, ok := e.savedIPAddress.Load().(*string); ok {
+               return *ip, nil
+       }
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+       defer cancel()
+       ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
+       if err != nil {
+               return "", fmt.Errorf("cannot get docker container info: %s", err)
+       }
+       ip := ctr.NetworkSettings.IPAddress
+       if ip == "" {
+               // TODO: try to enable networking if it wasn't
+               // already enabled when the container was
+               // created.
+               return "", fmt.Errorf("container has no IP address")
+       }
+       e.savedIPAddress.Store(&ip)
+       return ip, nil
+}
index dc1bc20b7c3a110269d1f95441a6c7e75af48ace..b5b7884339152acab09778c689863afd2c062941 100644 (file)
@@ -62,4 +62,6 @@ type containerExecutor interface {
 
        // Name of runtime engine ("docker", "singularity")
        Runtime() string
 
        // Name of runtime engine ("docker", "singularity")
        Runtime() string
+
+       GatewayTarget
 }
 }
index 99af0530ff35dd55e2163c8726e09699b06d5852..1c963f92110f20cbd7ef1723466a04e2c3c7f4e0 100644 (file)
@@ -8,6 +8,7 @@ import (
        "bytes"
        "io"
        "io/ioutil"
        "bytes"
        "io"
        "io/ioutil"
+       "net"
        "net/http"
        "os"
        "strings"
        "net/http"
        "os"
        "strings"
@@ -174,6 +175,61 @@ func (s *executorSuite) TestExecStdoutStderr(c *C) {
        c.Check(s.stderr.String(), Equals, "barwaz\n")
 }
 
        c.Check(s.stderr.String(), Equals, "barwaz\n")
 }
 
+func (s *executorSuite) TestIPAddress(c *C) {
+       s.spec.Command = []string{"nc", "-l", "-p", "1951", "-e", "printf", `HTTP/1.1 418 I'm a teapot\r\n\r\n`}
+       s.spec.EnableNetwork = true
+       c.Assert(s.executor.Create(s.spec), IsNil)
+       c.Assert(s.executor.Start(), IsNil)
+       starttime := time.Now()
+
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
+       defer cancel()
+
+       for ctx.Err() == nil {
+               time.Sleep(time.Second / 10)
+               _, err := s.executor.IPAddress()
+               if err == nil {
+                       break
+               }
+       }
+       ip, err := s.executor.IPAddress()
+       if c.Check(err, IsNil) && c.Check(ip, Not(Equals), "") {
+               req, err := http.NewRequest("BREW", "http://"+net.JoinHostPort(ip, "1951"), nil)
+               c.Assert(err, IsNil)
+               resp, err := http.DefaultClient.Do(req)
+               c.Assert(err, IsNil)
+               c.Check(resp.StatusCode, Equals, http.StatusTeapot)
+       }
+
+       s.executor.Stop()
+       code, _ := s.executor.Wait(ctx)
+       c.Logf("container ran for %v", time.Now().Sub(starttime))
+       c.Check(code, Equals, -1)
+}
+
+func (s *executorSuite) TestInject(c *C) {
+       s.spec.Command = []string{"sleep", "10"}
+       c.Assert(s.executor.Create(s.spec), IsNil)
+       c.Assert(s.executor.Start(), IsNil)
+       starttime := time.Now()
+
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
+       defer cancel()
+
+       injectcmd := []string{"cat", "/proc/1/cmdline"}
+       cmd, err := s.executor.InjectCommand(ctx, "", "root", false, injectcmd)
+       c.Assert(err, IsNil)
+       out, err := cmd.CombinedOutput()
+       c.Logf("inject %s => %q", injectcmd, out)
+       c.Check(err, IsNil)
+       c.Check(string(out), Equals, "sleep\00010\000")
+
+       s.executor.Stop()
+       code, _ := s.executor.Wait(ctx)
+       c.Logf("container ran for %v", time.Now().Sub(starttime))
+       c.Check(code, Equals, -1)
+}
+
 func (s *executorSuite) checkRun(c *C, expectCode int) {
        c.Assert(s.executor.Create(s.spec), IsNil)
        c.Assert(s.executor.Start(), IsNil)
 func (s *executorSuite) checkRun(c *C, expectCode int) {
        c.Assert(s.executor.Create(s.spec), IsNil)
        c.Assert(s.executor.Start(), IsNil)
index 64a3773250701ecd62832e52e88a5fcf8a2b3da2..6ba65200d2741b8ac66bb3dd109986a152e30d3f 100644 (file)
@@ -5,6 +5,7 @@
 package crunchrun
 
 import (
 package crunchrun
 
 import (
+       "errors"
        "fmt"
        "io/ioutil"
        "os"
        "fmt"
        "io/ioutil"
        "os"
@@ -349,3 +350,11 @@ func (e *singularityExecutor) Close() {
                e.logf("error removing temp dir: %s", err)
        }
 }
                e.logf("error removing temp dir: %s", err)
        }
 }
+
+func (e *singularityExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
+       return nil, errors.New("unimplemented")
+}
+
+func (e *singularityExecutor) IPAddress() (string, error) {
+       return "", errors.New("unimplemented")
+}