X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/76f82a555481de947be1ee70530e61266b7dcbf7..461fdaa1b96142b8065c131ae0334046fc71ea56:/lib/crunchrun/docker.go diff --git a/lib/crunchrun/docker.go b/lib/crunchrun/docker.go index ab00273ef3..4f449133f3 100644 --- a/lib/crunchrun/docker.go +++ b/lib/crunchrun/docker.go @@ -4,29 +4,45 @@ package crunchrun import ( + "context" "fmt" "io" "io/ioutil" "os" + "os/exec" "strings" + "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/arvados" dockertypes "github.com/docker/docker/api/types" dockercontainer "github.com/docker/docker/api/types/container" dockerclient "github.com/docker/docker/client" - "golang.org/x/net/context" ) // Docker daemon won't let you set a limit less than ~10 MiB const minDockerRAM = int64(16 * 1024 * 1024) +// DockerAPIVersion is the API version we use to communicate with the +// docker service. The oldest OS we support is Ubuntu 18.04 (bionic) +// which originally shipped docker 1.17.12 / API 1.35 so there is no +// reason to use an older API version. See +// https://dev.arvados.org/issues/15370#note-38 and +// https://docs.docker.com/engine/api/. +const DockerAPIVersion = "1.35" + +// Number of consecutive "inspect container" failures before +// concluding Docker is unresponsive, giving up, and cancelling the +// container. +const dockerWatchdogThreshold = 5 + type dockerExecutor struct { containerUUID string logf func(string, ...interface{}) watchdogInterval time.Duration dockerclient *dockerclient.Client containerID string + savedIPAddress atomic.Value doneIO chan struct{} errIO error } @@ -34,9 +50,9 @@ type dockerExecutor struct { func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) { // API version 1.21 corresponds to Docker 1.9, which is // currently the minimum version we want to support. - client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) + client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, DockerAPIVersion, nil, nil) if watchdogInterval < 1 { - watchdogInterval = time.Minute + watchdogInterval = time.Minute * 2 } return &dockerExecutor{ containerUUID: containerUUID, @@ -46,7 +62,20 @@ func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), }, err } -func (e *dockerExecutor) Runtime() string { return "docker" } +func (e *dockerExecutor) Runtime() string { + v, _ := e.dockerclient.ServerVersion(context.Background()) + info := "" + for _, cv := range v.Components { + if info != "" { + info += ", " + } + info += cv.Name + " " + cv.Version + } + if info == "" { + info = "(unknown version)" + } + return "docker " + info +} func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string, containerClient *arvados.Client) error { @@ -107,10 +136,40 @@ func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, doc }, } if spec.CUDADeviceCount != 0 { + var deviceIds []string + if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" { + // If a resource manager such as slurm or LSF told + // us to select specific devices we need to propagate that. + deviceIds = strings.Split(cudaVisibleDevices, ",") + } + + deviceCount := spec.CUDADeviceCount + if len(deviceIds) > 0 { + // Docker won't accept both non-empty + // DeviceIDs and a non-zero Count + // + // (it turns out "Count" is a dumb fallback + // that just allocates device 0, 1, 2, ..., + // Count-1) + deviceCount = 0 + } + + // Capabilities are confusing. The driver has generic + // capabilities "gpu" and "nvidia" but then there's + // additional capabilities "compute" and "utility" + // that are passed to nvidia-container-cli. + // + // "compute" means include the CUDA libraries and + // "utility" means include the CUDA utility programs + // (like nvidia-smi). + // + // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37 + // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{ Driver: "nvidia", - Count: spec.CUDADeviceCount, - Capabilities: [][]string{[]string{"gpu", "nvidia", "compute"}}, + Count: deviceCount, + DeviceIDs: deviceIds, + Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}}, }) } for path, mount := range spec.BindMounts { @@ -128,7 +187,7 @@ func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, doc func (e *dockerExecutor) Create(spec containerSpec) error { cfg, hostCfg := e.config(spec) - created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, e.containerUUID) + created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, nil, e.containerUUID) if err != nil { return fmt.Errorf("While creating container: %v", err) } @@ -136,8 +195,15 @@ func (e *dockerExecutor) Create(spec containerSpec) error { return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr) } -func (e *dockerExecutor) CgroupID() string { - return e.containerID +func (e *dockerExecutor) Pid() int { + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second)) + defer cancel() + ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID) + if err == nil && ctr.State != nil { + return ctr.State.Pid + } else { + return 0 + } } func (e *dockerExecutor) Start() error { @@ -171,21 +237,20 @@ func (e *dockerExecutor) Wait(ctx context.Context) (int, error) { // kill it. return } else if err != nil { - e.logf("Error inspecting container: %s", err) - watchdogErr <- err - return + watchdogErr <- fmt.Errorf("error inspecting container: %s", err) } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") { - watchdogErr <- fmt.Errorf("Container is not running: State=%v", ctr.State) - return + watchdogErr <- fmt.Errorf("container is not running: State=%v", ctr.State) + } else { + watchdogErr <- nil } } }() waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning) + errors := 0 for { select { case waitBody := <-waitOk: - e.logf("Container exited with code: %v", waitBody.StatusCode) // wait for stdout/stderr to complete <-e.doneIO return int(waitBody.StatusCode), nil @@ -197,7 +262,16 @@ func (e *dockerExecutor) Wait(ctx context.Context) (int, error) { return -1, ctx.Err() case err := <-watchdogErr: - return -1, err + if err == nil { + errors = 0 + } else { + e.logf("docker watchdog: %s", err) + errors++ + if errors >= dockerWatchdogThreshold { + e.logf("docker watchdog: giving up") + return -1, err + } + } } } } @@ -268,3 +342,34 @@ func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io. func (e *dockerExecutor) Close() { e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true}) } + +func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) { + cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username) + if usingTTY { + cmd.Args = append(cmd.Args, "-t") + } + cmd.Args = append(cmd.Args, e.containerID) + cmd.Args = append(cmd.Args, injectcmd...) + return cmd, nil +} + +func (e *dockerExecutor) IPAddress() (string, error) { + if ip, ok := e.savedIPAddress.Load().(*string); ok { + return *ip, nil + } + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) + defer cancel() + ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID) + if err != nil { + return "", fmt.Errorf("cannot get docker container info: %s", err) + } + ip := ctr.NetworkSettings.IPAddress + if ip == "" { + // TODO: try to enable networking if it wasn't + // already enabled when the container was + // created. + return "", fmt.Errorf("container has no IP address") + } + e.savedIPAddress.Store(&ip) + return ip, nil +}