package crunchrun
import (
+ "context"
"fmt"
"io"
"io/ioutil"
"os"
+ "os/exec"
"strings"
+ "sync/atomic"
"time"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
dockertypes "github.com/docker/docker/api/types"
dockercontainer "github.com/docker/docker/api/types/container"
dockerclient "github.com/docker/docker/client"
- "golang.org/x/net/context"
)
// Docker daemon won't let you set a limit less than ~10 MiB
const minDockerRAM = int64(16 * 1024 * 1024)
+// DockerAPIVersion is the API version we use to communicate with the
+// docker service. The oldest OS we support is Ubuntu 18.04 (bionic)
+// which originally shipped docker 1.17.12 / API 1.35 so there is no
+// reason to use an older API version. See
+// https://dev.arvados.org/issues/15370#note-38 and
+// https://docs.docker.com/engine/api/.
+const DockerAPIVersion = "1.35"
+
+// Number of consecutive "inspect container" failures before
+// concluding Docker is unresponsive, giving up, and cancelling the
+// container.
+const dockerWatchdogThreshold = 5
+
type dockerExecutor struct {
containerUUID string
logf func(string, ...interface{})
watchdogInterval time.Duration
dockerclient *dockerclient.Client
containerID string
+ savedIPAddress atomic.Value
doneIO chan struct{}
errIO error
}
func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
// API version 1.21 corresponds to Docker 1.9, which is
// currently the minimum version we want to support.
- client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
+ client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, DockerAPIVersion, nil, nil)
if watchdogInterval < 1 {
- watchdogInterval = time.Minute
+ watchdogInterval = time.Minute * 2
}
return &dockerExecutor{
containerUUID: containerUUID,
}, err
}
-func (e *dockerExecutor) ImageLoaded(imageID string) bool {
- _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
- return err == nil
+func (e *dockerExecutor) Runtime() string {
+ v, _ := e.dockerclient.ServerVersion(context.Background())
+ info := ""
+ for _, cv := range v.Components {
+ if info != "" {
+ info += ", "
+ }
+ info += cv.Name + " " + cv.Version
+ }
+ if info == "" {
+ info = "(unknown version)"
+ }
+ return "docker " + info
}
-func (e *dockerExecutor) LoadImage(filename string) error {
- f, err := os.Open(filename)
+func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
+ containerClient *arvados.Client) error {
+ _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
+ if err == nil {
+ // already loaded
+ return nil
+ }
+
+ f, err := os.Open(imageTarballPath)
if err != nil {
return err
}
return nil
}
-func (e *dockerExecutor) Create(spec containerSpec) error {
+func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
e.logf("Creating Docker container")
cfg := dockercontainer.Config{
Image: spec.Image,
KernelMemory: spec.RAM, // kernel portion
},
}
+ if spec.CUDADeviceCount != 0 {
+ var deviceIds []string
+ if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" {
+ // If a resource manager such as slurm or LSF told
+ // us to select specific devices we need to propagate that.
+ deviceIds = strings.Split(cudaVisibleDevices, ",")
+ }
+
+ deviceCount := spec.CUDADeviceCount
+ if len(deviceIds) > 0 {
+ // Docker won't accept both non-empty
+ // DeviceIDs and a non-zero Count
+ //
+ // (it turns out "Count" is a dumb fallback
+ // that just allocates device 0, 1, 2, ...,
+ // Count-1)
+ deviceCount = 0
+ }
+
+ // Capabilities are confusing. The driver has generic
+ // capabilities "gpu" and "nvidia" but then there's
+ // additional capabilities "compute" and "utility"
+ // that are passed to nvidia-container-cli.
+ //
+ // "compute" means include the CUDA libraries and
+ // "utility" means include the CUDA utility programs
+ // (like nvidia-smi).
+ //
+ // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37
+ // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go
+ hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
+ Driver: "nvidia",
+ Count: deviceCount,
+ DeviceIDs: deviceIds,
+ Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}},
+ })
+ }
for path, mount := range spec.BindMounts {
bind := mount.HostPath + ":" + path
if mount.ReadOnly {
if spec.EnableNetwork {
hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
}
+ return cfg, hostCfg
+}
- created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, e.containerUUID)
+func (e *dockerExecutor) Create(spec containerSpec) error {
+ cfg, hostCfg := e.config(spec)
+ created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, nil, e.containerUUID)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
}
return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
}
-func (e *dockerExecutor) CgroupID() string {
- return e.containerID
+func (e *dockerExecutor) Pid() int {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
+ defer cancel()
+ ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
+ if err == nil && ctr.State != nil {
+ return ctr.State.Pid
+ } else {
+ return 0
+ }
}
func (e *dockerExecutor) Start() error {
// kill it.
return
} else if err != nil {
- e.logf("Error inspecting container: %s", err)
- watchdogErr <- err
- return
+ watchdogErr <- fmt.Errorf("error inspecting container: %s", err)
} else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
- watchdogErr <- fmt.Errorf("Container is not running: State=%v", ctr.State)
- return
+ watchdogErr <- fmt.Errorf("container is not running: State=%v", ctr.State)
+ } else {
+ watchdogErr <- nil
}
}
}()
waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
+ errors := 0
for {
select {
case waitBody := <-waitOk:
- e.logf("Container exited with code: %v", waitBody.StatusCode)
// wait for stdout/stderr to complete
<-e.doneIO
return int(waitBody.StatusCode), nil
return -1, ctx.Err()
case err := <-watchdogErr:
- return -1, err
+ if err == nil {
+ errors = 0
+ } else {
+ e.logf("docker watchdog: %s", err)
+ errors++
+ if errors >= dockerWatchdogThreshold {
+ e.logf("docker watchdog: giving up")
+ return -1, err
+ }
+ }
}
}
}
func (e *dockerExecutor) Close() {
e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
}
+
+func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
+ cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
+ if usingTTY {
+ cmd.Args = append(cmd.Args, "-t")
+ }
+ cmd.Args = append(cmd.Args, e.containerID)
+ cmd.Args = append(cmd.Args, injectcmd...)
+ return cmd, nil
+}
+
+func (e *dockerExecutor) IPAddress() (string, error) {
+ if ip, ok := e.savedIPAddress.Load().(*string); ok {
+ return *ip, nil
+ }
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
+ if err != nil {
+ return "", fmt.Errorf("cannot get docker container info: %s", err)
+ }
+ ip := ctr.NetworkSettings.IPAddress
+ if ip == "" {
+ // TODO: try to enable networking if it wasn't
+ // already enabled when the container was
+ // created.
+ return "", fmt.Errorf("container has no IP address")
+ }
+ e.savedIPAddress.Store(&ip)
+ return ip, nil
+}