"--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
- if runner.executor.Runtime() == "docker" {
+ if _, isdocker := runner.executor.(*dockerExecutor); isdocker {
arvMountCmd = append(arvMountCmd, "--allow-other")
}
func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
runner.CrunchLog.Printf("%s", currentUserAndGroups())
- runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime())
+ v, _ := exec.Command("arv-mount", "--version").CombinedOutput()
+ runner.CrunchLog.Printf("Using FUSE mount: %s", v)
+ runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
+ runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
hostname, hosterr := os.Hostname()
if hosterr != nil {
// dispatcher did not tell us which external IP
// address to advertise --> no gateway service
cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
- } else if de, ok := cr.executor.(*dockerExecutor); ok {
+ } else {
cr.gateway = Gateway{
- Address: gwListen,
- AuthSecret: gwAuthSecret,
- ContainerUUID: containerUUID,
- DockerContainerID: &de.containerID,
- Log: cr.CrunchLog,
- ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+ Address: gwListen,
+ AuthSecret: gwAuthSecret,
+ ContainerUUID: containerUUID,
+ Target: cr.executor,
+ Log: cr.CrunchLog,
}
err = cr.gateway.Start()
if err != nil {
"testing"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
return e.loadErr
}
func (e *stubExecutor) Runtime() string { return "stub" }
+func (e *stubExecutor) Version() string { return "stub " + cmd.Version.String() }
func (e *stubExecutor) Create(spec containerSpec) error { e.created = spec; return e.createErr }
func (e *stubExecutor) Start() error { e.exit = make(chan int, 1); go e.runFunc(); return e.startErr }
func (e *stubExecutor) CgroupID() string { return "cgroupid" }
func (e *stubExecutor) Wait(context.Context) (int, error) {
return <-e.exit, e.waitErr
}
+ func (e *stubExecutor) InjectCommand(ctx context.Context, _, _ string, _ bool, _ []string) (*exec.Cmd, error) {
+ return nil, errors.New("unimplemented")
+ }
+ func (e *stubExecutor) IPAddress() (string, error) { return "", errors.New("unimplemented") }
const fakeInputCollectionPDH = "ffffffffaaaaaaaa88888888eeeeeeee+1234"
c.Assert(s.api.Logs["crunch-run"], NotNil)
c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container 'zzzzz-zzzzz-zzzzzzzzzzzzzzz' using stub runtime.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
}
func (s *TestSuite) TestContainerRecordLog(c *C) {
"io"
"io/ioutil"
"os"
+ "os/exec"
"strings"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
watchdogInterval time.Duration
dockerclient *dockerclient.Client
containerID string
+ savedIPAddress atomic.Value
doneIO chan struct{}
errIO error
}
}, err
}
-func (e *dockerExecutor) Runtime() string { return "docker" }
+func (e *dockerExecutor) Runtime() string {
+ v, _ := e.dockerclient.ServerVersion(context.Background())
+ info := ""
+ for _, cv := range v.Components {
+ if info != "" {
+ info += ", "
+ }
+ info += cv.Name + " " + cv.Version
+ }
+ if info == "" {
+ info = "(unknown version)"
+ }
+ return "docker " + info
+}
func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
containerClient *arvados.Client) error {
func (e *dockerExecutor) Close() {
e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
}
+
+ func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
+ cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
+ if usingTTY {
+ cmd.Args = append(cmd.Args, "-t")
+ }
+ cmd.Args = append(cmd.Args, e.containerID)
+ cmd.Args = append(cmd.Args, injectcmd...)
+ return cmd, nil
+ }
+
+ func (e *dockerExecutor) IPAddress() (string, error) {
+ if ip, ok := e.savedIPAddress.Load().(*string); ok {
+ return *ip, nil
+ }
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
+ if err != nil {
+ return "", fmt.Errorf("cannot get docker container info: %s", err)
+ }
+ ip := ctr.NetworkSettings.IPAddress
+ if ip == "" {
+ // TODO: try to enable networking if it wasn't
+ // already enabled when the container was
+ // created.
+ return "", fmt.Errorf("container has no IP address")
+ }
+ e.savedIPAddress.Store(&ip)
+ return ip, nil
+ }
// Release resources (temp dirs, stopped containers)
Close()
- // Name of runtime engine ("docker", "singularity")
+ // Name and version of runtime engine ("docker 20.10.16", "singularity-ce version 3.9.9")
Runtime() string
+
+ GatewayTarget
}
import (
"bytes"
+ "fmt"
"io"
"io/ioutil"
+ "net"
"net/http"
"os"
"strings"
}
func (s *executorSuite) TestExecTrivialContainer(c *C) {
+ c.Logf("Using container runtime: %s", s.executor.Runtime())
s.spec.Command = []string{"echo", "ok"}
s.checkRun(c, 0)
c.Check(s.stdout.String(), Equals, "ok\n")
c.Check(s.stderr.String(), Equals, "barwaz\n")
}
+ func (s *executorSuite) TestIPAddress(c *C) {
+ // Listen on an available port on the host.
+ ln, err := net.Listen("tcp", net.JoinHostPort("0.0.0.0", "0"))
+ c.Assert(err, IsNil)
+ defer ln.Close()
+ _, port, err := net.SplitHostPort(ln.Addr().String())
+ c.Assert(err, IsNil)
+
+ // Start a container that listens on the same port number that
+ // is already in use on the host.
+ s.spec.Command = []string{"nc", "-l", "-p", port, "-e", "printf", `HTTP/1.1 418 I'm a teapot\r\n\r\n`}
+ s.spec.EnableNetwork = true
+ c.Assert(s.executor.Create(s.spec), IsNil)
+ c.Assert(s.executor.Start(), IsNil)
+ starttime := time.Now()
+
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
+ defer cancel()
+
+ for ctx.Err() == nil {
+ time.Sleep(time.Second / 10)
+ _, err := s.executor.IPAddress()
+ if err == nil {
+ break
+ }
+ }
+ // When we connect to the port using s.executor.IPAddress(),
+ // we should reach the nc process running inside the
+ // container, not the net.Listen() running outside the
+ // container, even though both listen on the same port.
+ ip, err := s.executor.IPAddress()
+ if c.Check(err, IsNil) && c.Check(ip, Not(Equals), "") {
+ req, err := http.NewRequest("BREW", "http://"+net.JoinHostPort(ip, port), nil)
+ c.Assert(err, IsNil)
+ resp, err := http.DefaultClient.Do(req)
+ c.Assert(err, IsNil)
+ c.Check(resp.StatusCode, Equals, http.StatusTeapot)
+ }
+
+ s.executor.Stop()
+ code, _ := s.executor.Wait(ctx)
+ c.Logf("container ran for %v", time.Now().Sub(starttime))
+ c.Check(code, Equals, -1)
+
+ c.Logf("stdout:\n%s\n\n", s.stdout.String())
+ c.Logf("stderr:\n%s\n\n", s.stderr.String())
+ }
+
+ func (s *executorSuite) TestInject(c *C) {
+ hostdir := c.MkDir()
+ c.Assert(os.WriteFile(hostdir+"/testfile", []byte("first tube"), 0777), IsNil)
+ mountdir := fmt.Sprintf("/injecttest-%d", os.Getpid())
+ s.spec.Command = []string{"sleep", "10"}
+ s.spec.BindMounts = map[string]bindmount{mountdir: {HostPath: hostdir, ReadOnly: true}}
+ c.Assert(s.executor.Create(s.spec), IsNil)
+ c.Assert(s.executor.Start(), IsNil)
+ starttime := time.Now()
+
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
+ defer cancel()
+
+ // Allow InjectCommand to fail a few times while the container
+ // is starting
+ for ctx.Err() == nil {
+ _, err := s.executor.InjectCommand(ctx, "", "root", false, []string{"true"})
+ if err == nil {
+ break
+ }
+ time.Sleep(time.Second / 10)
+ }
+
+ injectcmd := []string{"cat", mountdir + "/testfile"}
+ cmd, err := s.executor.InjectCommand(ctx, "", "root", false, injectcmd)
+ c.Assert(err, IsNil)
+ out, err := cmd.CombinedOutput()
+ c.Logf("inject %s => %q", injectcmd, out)
+ c.Check(err, IsNil)
+ c.Check(string(out), Equals, "first tube")
+
+ s.executor.Stop()
+ code, _ := s.executor.Wait(ctx)
+ c.Logf("container ran for %v", time.Now().Sub(starttime))
+ c.Check(code, Equals, -1)
+ }
+
func (s *executorSuite) checkRun(c *C, expectCode int) {
c.Assert(s.executor.Create(s.spec), IsNil)
c.Assert(s.executor.Start(), IsNil)
package crunchrun
import (
+ "bytes"
+ "errors"
"fmt"
"io/ioutil"
+ "net"
"os"
"os/exec"
+ "os/user"
+ "regexp"
"sort"
+ "strconv"
+ "strings"
"syscall"
"time"
type singularityExecutor struct {
logf func(string, ...interface{})
+ fakeroot bool // use --fakeroot flag, allow --network=bridge when non-root (currently only used by tests)
spec containerSpec
tmpdir string
child *exec.Cmd
}, nil
}
-func (e *singularityExecutor) Runtime() string { return "singularity" }
+func (e *singularityExecutor) Runtime() string {
+ buf, err := exec.Command("singularity", "--version").CombinedOutput()
+ if err != nil {
+ return "singularity (unknown version)"
+ }
+ return strings.TrimSuffix(string(buf), "\n")
+}
func (e *singularityExecutor) getOrCreateProject(ownerUuid string, name string, containerClient *arvados.Client) (*arvados.Group, error) {
var gp arvados.GroupList
}
func (e *singularityExecutor) execCmd(path string) *exec.Cmd {
- args := []string{path, "exec", "--containall", "--cleanenv", "--pwd", e.spec.WorkingDir}
+ args := []string{path, "exec", "--containall", "--cleanenv", "--pwd=" + e.spec.WorkingDir}
+ if e.fakeroot {
+ args = append(args, "--fakeroot")
+ }
if !e.spec.EnableNetwork {
args = append(args, "--net", "--network=none")
+ } else if u, err := user.Current(); err == nil && u.Uid == "0" || e.fakeroot {
+ // Specifying --network=bridge fails unless (a) we are
+ // root, (b) we are using --fakeroot, or (c)
+ // singularity has been configured to allow our
+ // uid/gid to use it like so:
+ //
+ // singularity config global --set 'allow net networks' bridge
+ // singularity config global --set 'allow net groups' mygroup
+ args = append(args, "--net", "--network=bridge")
}
-
if e.spec.CUDADeviceCount != 0 {
args = append(args, "--nv")
}
for _, path := range binds {
mount := e.spec.BindMounts[path]
if path == e.spec.Env["HOME"] {
- // Singularity treates $HOME as special case
+ // Singularity treats $HOME as special case
args = append(args, "--home", mount.HostPath+":"+path)
} else {
args = append(args, "--bind", mount.HostPath+":"+path+":"+readonlyflag[mount.ReadOnly])
env := make([]string, 0, len(e.spec.Env))
for k, v := range e.spec.Env {
if k == "HOME" {
- // Singularity treates $HOME as special case, this is handled
- // with --home above
+ // Singularity treats $HOME as special case,
+ // this is handled with --home above
continue
}
env = append(env, "SINGULARITYENV_"+k+"="+v)
// us to select specific devices we need to propagate that.
env = append(env, "SINGULARITYENV_CUDA_VISIBLE_DEVICES="+cudaVisibleDevices)
}
+ // Singularity's default behavior is to evaluate each
+ // SINGULARITYENV_* env var with a shell as a double-quoted
+ // string and pass the result to the contained
+ // process. Singularity 3.10+ has an option to pass env vars
+ // through literally without evaluating, which is what we
+ // want. See https://github.com/sylabs/singularity/pull/704
+ // and https://dev.arvados.org/issues/19081
+ env = append(env, "SINGULARITY_NO_EVAL=1")
args = append(args, e.imageFilename)
args = append(args, e.spec.Command...)
e.logf("error removing temp dir: %s", err)
}
}
+
+ func (e *singularityExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
+ target, err := e.containedProcess()
+ if err != nil {
+ return nil, err
+ }
+ return exec.CommandContext(ctx, "nsenter", append([]string{fmt.Sprintf("--target=%d", target), "--all"}, injectcmd...)...), nil
+ }
+
+ var (
+ errContainerHasNoIPAddress = errors.New("container has no IP address distinct from host")
+ )
+
+ func (e *singularityExecutor) IPAddress() (string, error) {
+ target, err := e.containedProcess()
+ if err != nil {
+ return "", err
+ }
+ targetIPs, err := processIPs(target)
+ if err != nil {
+ return "", err
+ }
+ selfIPs, err := processIPs(os.Getpid())
+ if err != nil {
+ return "", err
+ }
+ for ip := range targetIPs {
+ if !selfIPs[ip] {
+ return ip, nil
+ }
+ }
+ return "", errContainerHasNoIPAddress
+ }
+
+ func processIPs(pid int) (map[string]bool, error) {
+ fibtrie, err := os.ReadFile(fmt.Sprintf("/proc/%d/net/fib_trie", pid))
+ if err != nil {
+ return nil, err
+ }
+
+ addrs := map[string]bool{}
+ // When we see a pair of lines like this:
+ //
+ // |-- 10.1.2.3
+ // /32 host LOCAL
+ //
+ // ...we set addrs["10.1.2.3"] = true
+ lines := bytes.Split(fibtrie, []byte{'\n'})
+ for linenumber, line := range lines {
+ if !bytes.HasSuffix(line, []byte("/32 host LOCAL")) {
+ continue
+ }
+ if linenumber < 1 {
+ continue
+ }
+ i := bytes.LastIndexByte(lines[linenumber-1], ' ')
+ if i < 0 || i >= len(line)-7 {
+ continue
+ }
+ addr := string(lines[linenumber-1][i+1:])
+ if net.ParseIP(addr).To4() != nil {
+ addrs[addr] = true
+ }
+ }
+ return addrs, nil
+ }
+
+ var (
+ errContainerNotStarted = errors.New("container has not started yet")
+ errCannotFindChild = errors.New("failed to find any process inside the container")
+ reProcStatusPPid = regexp.MustCompile(`\nPPid:\t(\d+)\n`)
+ )
+
+ // Return the PID of a process that is inside the container (not
+ // necessarily the topmost/pid=1 process in the container).
+ func (e *singularityExecutor) containedProcess() (int, error) {
+ if e.child == nil || e.child.Process == nil {
+ return 0, errContainerNotStarted
+ }
+ lsns, err := exec.Command("lsns").CombinedOutput()
+ if err != nil {
+ return 0, fmt.Errorf("lsns: %w", err)
+ }
+ for _, line := range bytes.Split(lsns, []byte{'\n'}) {
+ fields := bytes.Fields(line)
+ if len(fields) < 4 {
+ continue
+ }
+ if !bytes.Equal(fields[1], []byte("pid")) {
+ continue
+ }
+ pid, err := strconv.ParseInt(string(fields[3]), 10, 64)
+ if err != nil {
+ return 0, fmt.Errorf("error parsing PID field in lsns output: %q", fields[3])
+ }
+ for parent := pid; ; {
+ procstatus, err := os.ReadFile(fmt.Sprintf("/proc/%d/status", parent))
+ if err != nil {
+ break
+ }
+ m := reProcStatusPPid.FindSubmatch(procstatus)
+ if m == nil {
+ break
+ }
+ parent, err = strconv.ParseInt(string(m[1]), 10, 64)
+ if err != nil {
+ break
+ }
+ if int(parent) == e.child.Process.Pid {
+ return int(pid), nil
+ }
+ }
+ }
+ return 0, errCannotFindChild
+ }
package crunchrun
import (
+ "os"
"os/exec"
. "gopkg.in/check.v1"
+ check "gopkg.in/check.v1"
)
var _ = Suite(&singularitySuite{})
}
}
+ func (s *singularitySuite) TearDownSuite(c *C) {
+ if s.executor != nil {
+ s.executor.Close()
+ }
+ }
+
+ func (s *singularitySuite) TestIPAddress(c *C) {
+ // In production, executor will choose --network=bridge
+ // because uid=0 under arvados-dispatch-cloud. But in test
+ // cases, uid!=0, which means --network=bridge is conditional
+ // on --fakeroot.
+ uuc, err := os.ReadFile("/proc/sys/kernel/unprivileged_userns_clone")
+ c.Check(err, check.IsNil)
+ if string(uuc) == "0\n" {
+ c.Skip("insufficient privileges to run this test case -- `singularity exec --fakeroot` requires /proc/sys/kernel/unprivileged_userns_clone = 1")
+ }
+ s.executor.(*singularityExecutor).fakeroot = true
+ s.executorSuite.TestIPAddress(c)
+ }
+
+ func (s *singularitySuite) TestInject(c *C) {
+ path, err := exec.LookPath("nsenter")
+ if err != nil || path != "/var/lib/arvados/bin/nsenter" {
+ c.Skip("looks like /var/lib/arvados/bin/nsenter is not installed -- re-run `arvados-server install`?")
+ }
+ s.executorSuite.TestInject(c)
+ }
+
var _ = Suite(&singularityStubSuite{})
// singularityStubSuite tests don't really invoke singularity, so we
c.Check(err, IsNil)
e.imageFilename = "/fake/image.sif"
cmd := e.execCmd("./singularity")
- c.Check(cmd.Args, DeepEquals, []string{"./singularity", "exec", "--containall", "--cleanenv", "--pwd", "/WorkingDir", "--net", "--network=none", "--nv", "--bind", "/hostpath:/mnt:ro", "/fake/image.sif"})
+ c.Check(cmd.Args, DeepEquals, []string{"./singularity", "exec", "--containall", "--cleanenv", "--pwd=/WorkingDir", "--net", "--network=none", "--nv", "--bind", "/hostpath:/mnt:ro", "/fake/image.sif"})
- c.Check(cmd.Env, DeepEquals, []string{"SINGULARITYENV_FOO=bar"})
+ c.Check(cmd.Env, DeepEquals, []string{"SINGULARITYENV_FOO=bar", "SINGULARITY_NO_EVAL=1"})
}
}
if dev || test {
pkgs = append(pkgs, "squashfs-tools") // for singularity
+ pkgs = append(pkgs, "gnupg") // for docker install recipe
}
switch {
case osv.Debian && osv.Major >= 11:
}
}
+ if dev || test {
+ if havedockerversion, err := exec.Command("docker", "--version").CombinedOutput(); err == nil {
+ logger.Printf("%s installed, assuming that version is ok", bytes.TrimSuffix(havedockerversion, []byte("\n")))
+ } else if osv.Debian {
+ var codename string
+ switch osv.Major {
+ case 10:
+ codename = "buster"
+ case 11:
+ codename = "bullseye"
+ default:
+ err = fmt.Errorf("don't know how to install docker-ce for debian %d", osv.Major)
+ return 1
+ }
+ err = inst.runBash(`
+rm -f /usr/share/keyrings/docker-archive-keyring.gpg
+curl -fsSL https://download.docker.com/linux/debian/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
+echo 'deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian/ `+codename+` stable' | \
+ tee /etc/apt/sources.list.d/docker.list
+apt-get update
+DEBIAN_FRONTEND=noninteractive apt-get --yes --no-install-recommends install docker-ce
+`, stdout, stderr)
+ if err != nil {
+ return 1
+ }
+ } else {
+ err = fmt.Errorf("don't know how to install docker for osversion %v", osv)
+ return 1
+ }
+ }
+
os.Mkdir("/var/lib/arvados", 0755)
os.Mkdir("/var/lib/arvados/tmp", 0700)
if prod || pkg {
}
}
+ err = inst.runBash(`
+ install /usr/bin/nsenter /var/lib/arvados/bin/nsenter
+ setcap "cap_sys_admin+pei cap_sys_chroot+pei" /var/lib/arvados/bin/nsenter
+ `, stdout, stderr)
+ if err != nil {
+ return 1
+ }
+
// The entry in /etc/locale.gen is "en_US.UTF-8"; once
// it's installed, locale -a reports it as
// "en_US.utf8".