From: Tom Clegg Date: Thu, 12 May 2022 14:12:29 +0000 (-0400) Subject: 19099: Refactor container shell backend so it's not docker-specific. X-Git-Tag: 2.5.0~163^2~12 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/7ebe828a435dcaa1b5668b72adbaad495059f211 19099: Refactor container shell backend so it's not docker-specific. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/lib/crunchrun/container_gateway.go b/lib/crunchrun/container_gateway.go index 2ec24bac78..62979da21b 100644 --- a/lib/crunchrun/container_gateway.go +++ b/lib/crunchrun/container_gateway.go @@ -17,30 +17,33 @@ import ( "os" "os/exec" "sync" - "sync/atomic" "syscall" - "time" "git.arvados.org/arvados.git/lib/selfsigned" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/httpserver" "github.com/creack/pty" - dockerclient "github.com/docker/docker/client" "github.com/google/shlex" "golang.org/x/crypto/ssh" "golang.org/x/net/context" ) +type GatewayTarget interface { + // Command that will execute cmd inside the container + InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, cmd []string) (*exec.Cmd, error) + + // IP address inside container + IPAddress() (string, error) +} + type Gateway struct { - DockerContainerID *string - ContainerUUID string - Address string // listen host:port; if port=0, Start() will change it to the selected port - AuthSecret string - Log interface { + ContainerUUID string + Address string // listen host:port; if port=0, Start() will change it to the selected port + AuthSecret string + Target GatewayTarget + Log interface { Printf(fmt string, args ...interface{}) } - // return local ip address of running container, or "" if not available - ContainerIPAddress func() (string, error) sshConfig ssh.ServerConfig requestAuth string @@ -241,15 +244,11 @@ func (gw *Gateway) handleDirectTCPIP(ctx context.Context, newch ssh.NewChannel) return } - var dstaddr string - if gw.ContainerIPAddress != nil { - dstaddr, err = gw.ContainerIPAddress() - if err != nil { - fmt.Fprintf(ch.Stderr(), "container has no IP address: %s\n", err) - return - } - } - if dstaddr == "" { + dstaddr, err := gw.Target.IPAddress() + if err != nil { + fmt.Fprintf(ch.Stderr(), "container has no IP address: %s\n", err) + return + } else if dstaddr == "" { fmt.Fprintf(ch.Stderr(), "container has no IP address\n") return } @@ -301,12 +300,25 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta execargs = []string{"/bin/bash", "-login"} } go func() { - cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username) + var resp struct { + Status uint32 + } + defer func() { + ch.SendRequest("exit-status", false, ssh.Marshal(&resp)) + ch.Close() + }() + + cmd, err := gw.Target.InjectCommand(ctx, detachKeys, username, tty0 != nil, execargs) + if err != nil { + fmt.Fprintln(ch.Stderr(), err) + ch.CloseWrite() + resp.Status = 1 + return + } cmd.Stdin = ch cmd.Stdout = ch cmd.Stderr = ch.Stderr() if tty0 != nil { - cmd.Args = append(cmd.Args, "-t") cmd.Stdin = tty0 cmd.Stdout = tty0 cmd.Stderr = tty0 @@ -318,17 +330,12 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta // Send our own debug messages to tty as well. logw = tty0 } - cmd.Args = append(cmd.Args, *gw.DockerContainerID) - cmd.Args = append(cmd.Args, execargs...) cmd.SysProcAttr = &syscall.SysProcAttr{ Setctty: tty0 != nil, Setsid: true, } cmd.Env = append(os.Environ(), termEnv...) - err := cmd.Run() - var resp struct { - Status uint32 - } + err = cmd.Run() if exiterr, ok := err.(*exec.ExitError); ok { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { resp.Status = uint32(status.ExitStatus()) @@ -341,8 +348,6 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta if resp.Status == 0 && (err != nil || errClose != nil) { resp.Status = 1 } - ch.SendRequest("exit-status", false, ssh.Marshal(&resp)) - ch.Close() }() case "pty-req": eol = "\r\n" @@ -398,31 +403,3 @@ func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, deta } } } - -func dockerContainerIPAddress(containerID *string) func() (string, error) { - var saved atomic.Value - return func() (string, error) { - if ip, ok := saved.Load().(*string); ok { - return *ip, nil - } - docker, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) - if err != nil { - return "", fmt.Errorf("cannot create docker client: %s", err) - } - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) - defer cancel() - ctr, err := docker.ContainerInspect(ctx, *containerID) - if err != nil { - return "", fmt.Errorf("cannot get docker container info: %s", err) - } - ip := ctr.NetworkSettings.IPAddress - if ip == "" { - // TODO: try to enable networking if it wasn't - // already enabled when the container was - // created. - return "", fmt.Errorf("container has no IP address") - } - saved.Store(&ip) - return ip, nil - } -} diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 474fbf4ade..0364db78e4 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -1901,14 +1901,13 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s // dispatcher did not tell us which external IP // address to advertise --> no gateway service cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)") - } else if de, ok := cr.executor.(*dockerExecutor); ok { + } else { cr.gateway = Gateway{ - Address: gwListen, - AuthSecret: gwAuthSecret, - ContainerUUID: containerUUID, - DockerContainerID: &de.containerID, - Log: cr.CrunchLog, - ContainerIPAddress: dockerContainerIPAddress(&de.containerID), + Address: gwListen, + AuthSecret: gwAuthSecret, + ContainerUUID: containerUUID, + Target: cr.executor, + Log: cr.CrunchLog, } err = cr.gateway.Start() if err != nil { diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go index 1d2c7b09fd..9e2286d688 100644 --- a/lib/crunchrun/crunchrun_test.go +++ b/lib/crunchrun/crunchrun_test.go @@ -136,6 +136,10 @@ func (e *stubExecutor) Close() { e.closed = true } func (e *stubExecutor) Wait(context.Context) (int, error) { return <-e.exit, e.waitErr } +func (e *stubExecutor) InjectCommand(ctx context.Context, _, _ string, _ bool, _ []string) (*exec.Cmd, error) { + return nil, errors.New("unimplemented") +} +func (e *stubExecutor) IPAddress() (string, error) { return "", errors.New("unimplemented") } const fakeInputCollectionPDH = "ffffffffaaaaaaaa88888888eeeeeeee+1234" diff --git a/lib/crunchrun/docker.go b/lib/crunchrun/docker.go index e62f2a39ba..dde96b08e7 100644 --- a/lib/crunchrun/docker.go +++ b/lib/crunchrun/docker.go @@ -8,7 +8,9 @@ import ( "io" "io/ioutil" "os" + "os/exec" "strings" + "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -27,6 +29,7 @@ type dockerExecutor struct { watchdogInterval time.Duration dockerclient *dockerclient.Client containerID string + savedIPAddress atomic.Value doneIO chan struct{} errIO error } @@ -297,3 +300,34 @@ func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io. func (e *dockerExecutor) Close() { e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true}) } + +func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) { + cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username) + if usingTTY { + cmd.Args = append(cmd.Args, "-t") + } + cmd.Args = append(cmd.Args, e.containerID) + cmd.Args = append(cmd.Args, injectcmd...) + return cmd, nil +} + +func (e *dockerExecutor) IPAddress() (string, error) { + if ip, ok := e.savedIPAddress.Load().(*string); ok { + return *ip, nil + } + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) + defer cancel() + ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID) + if err != nil { + return "", fmt.Errorf("cannot get docker container info: %s", err) + } + ip := ctr.NetworkSettings.IPAddress + if ip == "" { + // TODO: try to enable networking if it wasn't + // already enabled when the container was + // created. + return "", fmt.Errorf("container has no IP address") + } + e.savedIPAddress.Store(&ip) + return ip, nil +} diff --git a/lib/crunchrun/executor.go b/lib/crunchrun/executor.go index dc1bc20b7c..b5b7884339 100644 --- a/lib/crunchrun/executor.go +++ b/lib/crunchrun/executor.go @@ -62,4 +62,6 @@ type containerExecutor interface { // Name of runtime engine ("docker", "singularity") Runtime() string + + GatewayTarget } diff --git a/lib/crunchrun/executor_test.go b/lib/crunchrun/executor_test.go index 99af0530ff..1c963f9211 100644 --- a/lib/crunchrun/executor_test.go +++ b/lib/crunchrun/executor_test.go @@ -8,6 +8,7 @@ import ( "bytes" "io" "io/ioutil" + "net" "net/http" "os" "strings" @@ -174,6 +175,61 @@ func (s *executorSuite) TestExecStdoutStderr(c *C) { c.Check(s.stderr.String(), Equals, "barwaz\n") } +func (s *executorSuite) TestIPAddress(c *C) { + s.spec.Command = []string{"nc", "-l", "-p", "1951", "-e", "printf", `HTTP/1.1 418 I'm a teapot\r\n\r\n`} + s.spec.EnableNetwork = true + c.Assert(s.executor.Create(s.spec), IsNil) + c.Assert(s.executor.Start(), IsNil) + starttime := time.Now() + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) + defer cancel() + + for ctx.Err() == nil { + time.Sleep(time.Second / 10) + _, err := s.executor.IPAddress() + if err == nil { + break + } + } + ip, err := s.executor.IPAddress() + if c.Check(err, IsNil) && c.Check(ip, Not(Equals), "") { + req, err := http.NewRequest("BREW", "http://"+net.JoinHostPort(ip, "1951"), nil) + c.Assert(err, IsNil) + resp, err := http.DefaultClient.Do(req) + c.Assert(err, IsNil) + c.Check(resp.StatusCode, Equals, http.StatusTeapot) + } + + s.executor.Stop() + code, _ := s.executor.Wait(ctx) + c.Logf("container ran for %v", time.Now().Sub(starttime)) + c.Check(code, Equals, -1) +} + +func (s *executorSuite) TestInject(c *C) { + s.spec.Command = []string{"sleep", "10"} + c.Assert(s.executor.Create(s.spec), IsNil) + c.Assert(s.executor.Start(), IsNil) + starttime := time.Now() + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) + defer cancel() + + injectcmd := []string{"cat", "/proc/1/cmdline"} + cmd, err := s.executor.InjectCommand(ctx, "", "root", false, injectcmd) + c.Assert(err, IsNil) + out, err := cmd.CombinedOutput() + c.Logf("inject %s => %q", injectcmd, out) + c.Check(err, IsNil) + c.Check(string(out), Equals, "sleep\00010\000") + + s.executor.Stop() + code, _ := s.executor.Wait(ctx) + c.Logf("container ran for %v", time.Now().Sub(starttime)) + c.Check(code, Equals, -1) +} + func (s *executorSuite) checkRun(c *C, expectCode int) { c.Assert(s.executor.Create(s.spec), IsNil) c.Assert(s.executor.Start(), IsNil) diff --git a/lib/crunchrun/singularity.go b/lib/crunchrun/singularity.go index 64a3773250..6ba65200d2 100644 --- a/lib/crunchrun/singularity.go +++ b/lib/crunchrun/singularity.go @@ -5,6 +5,7 @@ package crunchrun import ( + "errors" "fmt" "io/ioutil" "os" @@ -349,3 +350,11 @@ func (e *singularityExecutor) Close() { e.logf("error removing temp dir: %s", err) } } + +func (e *singularityExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) { + return nil, errors.New("unimplemented") +} + +func (e *singularityExecutor) IPAddress() (string, error) { + return "", errors.New("unimplemented") +}