"os"
"os/exec"
"sync"
- "sync/atomic"
"syscall"
- "time"
"git.arvados.org/arvados.git/lib/selfsigned"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
"github.com/creack/pty"
- dockerclient "github.com/docker/docker/client"
"github.com/google/shlex"
"golang.org/x/crypto/ssh"
"golang.org/x/net/context"
)
+type GatewayTarget interface {
+ // Command that will execute cmd inside the container
+ InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, cmd []string) (*exec.Cmd, error)
+
+ // IP address inside container
+ IPAddress() (string, error)
+}
+
type Gateway struct {
- DockerContainerID *string
- ContainerUUID string
- Address string // listen host:port; if port=0, Start() will change it to the selected port
- AuthSecret string
- Log interface {
+ ContainerUUID string
+ Address string // listen host:port; if port=0, Start() will change it to the selected port
+ AuthSecret string
+ Target GatewayTarget
+ Log interface {
Printf(fmt string, args ...interface{})
}
- // return local ip address of running container, or "" if not available
- ContainerIPAddress func() (string, error)
sshConfig ssh.ServerConfig
requestAuth string
return
}
- var dstaddr string
- if gw.ContainerIPAddress != nil {
- dstaddr, err = gw.ContainerIPAddress()
- if err != nil {
- fmt.Fprintf(ch.Stderr(), "container has no IP address: %s\n", err)
- return
- }
- }
- if dstaddr == "" {
+ dstaddr, err := gw.Target.IPAddress()
+ if err != nil {
+ fmt.Fprintf(ch.Stderr(), "container has no IP address: %s\n", err)
+ return
+ } else if dstaddr == "" {
fmt.Fprintf(ch.Stderr(), "container has no IP address\n")
return
}
execargs = []string{"/bin/bash", "-login"}
}
go func() {
- cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
+ var resp struct {
+ Status uint32
+ }
+ defer func() {
+ ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
+ ch.Close()
+ }()
+
+ cmd, err := gw.Target.InjectCommand(ctx, detachKeys, username, tty0 != nil, execargs)
+ if err != nil {
+ fmt.Fprintln(ch.Stderr(), err)
+ ch.CloseWrite()
+ resp.Status = 1
+ return
+ }
cmd.Stdin = ch
cmd.Stdout = ch
cmd.Stderr = ch.Stderr()
if tty0 != nil {
- cmd.Args = append(cmd.Args, "-t")
cmd.Stdin = tty0
cmd.Stdout = tty0
cmd.Stderr = tty0
// Send our own debug messages to tty as well.
logw = tty0
}
- cmd.Args = append(cmd.Args, *gw.DockerContainerID)
- cmd.Args = append(cmd.Args, execargs...)
cmd.SysProcAttr = &syscall.SysProcAttr{
Setctty: tty0 != nil,
Setsid: true,
}
cmd.Env = append(os.Environ(), termEnv...)
- err := cmd.Run()
- var resp struct {
- Status uint32
- }
+ err = cmd.Run()
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
resp.Status = uint32(status.ExitStatus())
if resp.Status == 0 && (err != nil || errClose != nil) {
resp.Status = 1
}
- ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
- ch.Close()
}()
case "pty-req":
eol = "\r\n"
}
}
}
-
-func dockerContainerIPAddress(containerID *string) func() (string, error) {
- var saved atomic.Value
- return func() (string, error) {
- if ip, ok := saved.Load().(*string); ok {
- return *ip, nil
- }
- docker, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- if err != nil {
- return "", fmt.Errorf("cannot create docker client: %s", err)
- }
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
- defer cancel()
- ctr, err := docker.ContainerInspect(ctx, *containerID)
- if err != nil {
- return "", fmt.Errorf("cannot get docker container info: %s", err)
- }
- ip := ctr.NetworkSettings.IPAddress
- if ip == "" {
- // TODO: try to enable networking if it wasn't
- // already enabled when the container was
- // created.
- return "", fmt.Errorf("container has no IP address")
- }
- saved.Store(&ip)
- return ip, nil
- }
-}
// dispatcher did not tell us which external IP
// address to advertise --> no gateway service
cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
- } else if de, ok := cr.executor.(*dockerExecutor); ok {
+ } else {
cr.gateway = Gateway{
- Address: gwListen,
- AuthSecret: gwAuthSecret,
- ContainerUUID: containerUUID,
- DockerContainerID: &de.containerID,
- Log: cr.CrunchLog,
- ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+ Address: gwListen,
+ AuthSecret: gwAuthSecret,
+ ContainerUUID: containerUUID,
+ Target: cr.executor,
+ Log: cr.CrunchLog,
}
err = cr.gateway.Start()
if err != nil {
func (e *stubExecutor) Wait(context.Context) (int, error) {
return <-e.exit, e.waitErr
}
+func (e *stubExecutor) InjectCommand(ctx context.Context, _, _ string, _ bool, _ []string) (*exec.Cmd, error) {
+ return nil, errors.New("unimplemented")
+}
+func (e *stubExecutor) IPAddress() (string, error) { return "", errors.New("unimplemented") }
const fakeInputCollectionPDH = "ffffffffaaaaaaaa88888888eeeeeeee+1234"
"io"
"io/ioutil"
"os"
+ "os/exec"
"strings"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
watchdogInterval time.Duration
dockerclient *dockerclient.Client
containerID string
+ savedIPAddress atomic.Value
doneIO chan struct{}
errIO error
}
func (e *dockerExecutor) Close() {
e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
}
+
+func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
+ cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
+ if usingTTY {
+ cmd.Args = append(cmd.Args, "-t")
+ }
+ cmd.Args = append(cmd.Args, e.containerID)
+ cmd.Args = append(cmd.Args, injectcmd...)
+ return cmd, nil
+}
+
+func (e *dockerExecutor) IPAddress() (string, error) {
+ if ip, ok := e.savedIPAddress.Load().(*string); ok {
+ return *ip, nil
+ }
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
+ if err != nil {
+ return "", fmt.Errorf("cannot get docker container info: %s", err)
+ }
+ ip := ctr.NetworkSettings.IPAddress
+ if ip == "" {
+ // TODO: try to enable networking if it wasn't
+ // already enabled when the container was
+ // created.
+ return "", fmt.Errorf("container has no IP address")
+ }
+ e.savedIPAddress.Store(&ip)
+ return ip, nil
+}
// Name of runtime engine ("docker", "singularity")
Runtime() string
+
+ GatewayTarget
}
"bytes"
"io"
"io/ioutil"
+ "net"
"net/http"
"os"
"strings"
c.Check(s.stderr.String(), Equals, "barwaz\n")
}
+func (s *executorSuite) TestIPAddress(c *C) {
+ s.spec.Command = []string{"nc", "-l", "-p", "1951", "-e", "printf", `HTTP/1.1 418 I'm a teapot\r\n\r\n`}
+ s.spec.EnableNetwork = true
+ c.Assert(s.executor.Create(s.spec), IsNil)
+ c.Assert(s.executor.Start(), IsNil)
+ starttime := time.Now()
+
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
+ defer cancel()
+
+ for ctx.Err() == nil {
+ time.Sleep(time.Second / 10)
+ _, err := s.executor.IPAddress()
+ if err == nil {
+ break
+ }
+ }
+ ip, err := s.executor.IPAddress()
+ if c.Check(err, IsNil) && c.Check(ip, Not(Equals), "") {
+ req, err := http.NewRequest("BREW", "http://"+net.JoinHostPort(ip, "1951"), nil)
+ c.Assert(err, IsNil)
+ resp, err := http.DefaultClient.Do(req)
+ c.Assert(err, IsNil)
+ c.Check(resp.StatusCode, Equals, http.StatusTeapot)
+ }
+
+ s.executor.Stop()
+ code, _ := s.executor.Wait(ctx)
+ c.Logf("container ran for %v", time.Now().Sub(starttime))
+ c.Check(code, Equals, -1)
+}
+
+func (s *executorSuite) TestInject(c *C) {
+ s.spec.Command = []string{"sleep", "10"}
+ c.Assert(s.executor.Create(s.spec), IsNil)
+ c.Assert(s.executor.Start(), IsNil)
+ starttime := time.Now()
+
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
+ defer cancel()
+
+ injectcmd := []string{"cat", "/proc/1/cmdline"}
+ cmd, err := s.executor.InjectCommand(ctx, "", "root", false, injectcmd)
+ c.Assert(err, IsNil)
+ out, err := cmd.CombinedOutput()
+ c.Logf("inject %s => %q", injectcmd, out)
+ c.Check(err, IsNil)
+ c.Check(string(out), Equals, "sleep\00010\000")
+
+ s.executor.Stop()
+ code, _ := s.executor.Wait(ctx)
+ c.Logf("container ran for %v", time.Now().Sub(starttime))
+ c.Check(code, Equals, -1)
+}
+
func (s *executorSuite) checkRun(c *C, expectCode int) {
c.Assert(s.executor.Create(s.spec), IsNil)
c.Assert(s.executor.Start(), IsNil)
package crunchrun
import (
+ "errors"
"fmt"
"io/ioutil"
"os"
e.logf("error removing temp dir: %s", err)
}
}
+
+func (e *singularityExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
+ return nil, errors.New("unimplemented")
+}
+
+func (e *singularityExecutor) IPAddress() (string, error) {
+ return "", errors.New("unimplemented")
+}