1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 dockertypes "github.com/docker/docker/api/types"
19 dockercontainer "github.com/docker/docker/api/types/container"
20 dockerclient "github.com/docker/docker/client"
23 // Docker daemon won't let you set a limit less than ~10 MiB
24 const minDockerRAM = int64(16 * 1024 * 1024)
26 // DockerAPIVersion is the API version we use to communicate with the
27 // docker service. The oldest OS we support is Ubuntu 18.04 (bionic)
28 // which originally shipped docker 1.17.12 / API 1.35 so there is no
29 // reason to use an older API version. See
30 // https://dev.arvados.org/issues/15370#note-38 and
31 // https://docs.docker.com/engine/api/.
32 const DockerAPIVersion = "1.35"
34 // Number of consecutive "inspect container" failures before
35 // concluding Docker is unresponsive, giving up, and cancelling the
37 const dockerWatchdogThreshold = 5
39 type dockerExecutor struct {
41 logf func(string, ...interface{})
42 watchdogInterval time.Duration
43 dockerclient *dockerclient.Client
45 savedIPAddress atomic.Value
50 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
51 // API version 1.21 corresponds to Docker 1.9, which is
52 // currently the minimum version we want to support.
53 client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, DockerAPIVersion, nil, nil)
54 if watchdogInterval < 1 {
55 watchdogInterval = time.Minute * 2
57 return &dockerExecutor{
58 containerUUID: containerUUID,
60 watchdogInterval: watchdogInterval,
65 func (e *dockerExecutor) Runtime() string {
66 v, _ := e.dockerclient.ServerVersion(context.Background())
68 for _, cv := range v.Components {
72 info += cv.Name + " " + cv.Version
75 info = "(unknown version)"
77 return "docker " + info
80 func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
81 containerClient *arvados.Client) error {
82 _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
88 f, err := os.Open(imageTarballPath)
93 resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
95 return fmt.Errorf("While loading container image into Docker: %v", err)
97 defer resp.Body.Close()
98 buf, _ := ioutil.ReadAll(resp.Body)
99 e.logf("loaded image: response %s", buf)
103 func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
104 e.logf("Creating Docker container")
105 cfg := dockercontainer.Config{
108 WorkingDir: spec.WorkingDir,
109 Volumes: map[string]struct{}{},
110 OpenStdin: spec.Stdin != nil,
111 StdinOnce: spec.Stdin != nil,
112 AttachStdin: spec.Stdin != nil,
116 if cfg.WorkingDir == "." {
119 for k, v := range spec.Env {
120 cfg.Env = append(cfg.Env, k+"="+v)
122 if spec.RAM > 0 && spec.RAM < minDockerRAM {
123 spec.RAM = minDockerRAM
125 hostCfg := dockercontainer.HostConfig{
126 LogConfig: dockercontainer.LogConfig{
129 NetworkMode: dockercontainer.NetworkMode("none"),
130 Resources: dockercontainer.Resources{
131 CgroupParent: spec.CgroupParent,
132 NanoCPUs: int64(spec.VCPUs) * 1000000000,
133 Memory: spec.RAM, // RAM
134 MemorySwap: spec.RAM, // RAM+swap
135 KernelMemory: spec.RAM, // kernel portion
138 if spec.CUDADeviceCount != 0 {
139 var deviceIds []string
140 if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" {
141 // If a resource manager such as slurm or LSF told
142 // us to select specific devices we need to propagate that.
143 deviceIds = strings.Split(cudaVisibleDevices, ",")
146 deviceCount := spec.CUDADeviceCount
147 if len(deviceIds) > 0 {
148 // Docker won't accept both non-empty
149 // DeviceIDs and a non-zero Count
151 // (it turns out "Count" is a dumb fallback
152 // that just allocates device 0, 1, 2, ...,
157 // Capabilities are confusing. The driver has generic
158 // capabilities "gpu" and "nvidia" but then there's
159 // additional capabilities "compute" and "utility"
160 // that are passed to nvidia-container-cli.
162 // "compute" means include the CUDA libraries and
163 // "utility" means include the CUDA utility programs
164 // (like nvidia-smi).
166 // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37
167 // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go
168 hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
171 DeviceIDs: deviceIds,
172 Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}},
175 for path, mount := range spec.BindMounts {
176 bind := mount.HostPath + ":" + path
180 hostCfg.Binds = append(hostCfg.Binds, bind)
182 if spec.EnableNetwork {
183 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
188 func (e *dockerExecutor) Create(spec containerSpec) error {
189 cfg, hostCfg := e.config(spec)
190 created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, nil, e.containerUUID)
192 return fmt.Errorf("While creating container: %v", err)
194 e.containerID = created.ID
195 return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
198 func (e *dockerExecutor) Pid() int {
199 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
201 ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
202 if err == nil && ctr.State != nil {
209 func (e *dockerExecutor) Start() error {
210 return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockertypes.ContainerStartOptions{})
213 func (e *dockerExecutor) Stop() error {
214 err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
215 if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
221 // Wait for the container to terminate, capture the exit code, and
222 // wait for stdout/stderr logging to finish.
223 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
224 ctx, cancel := context.WithCancel(ctx)
226 watchdogErr := make(chan error, 1)
228 ticker := time.NewTicker(e.watchdogInterval)
231 dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
232 ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
234 if ctx.Err() != nil {
235 // Either the container already
236 // exited, or our caller is trying to
239 } else if err != nil {
240 watchdogErr <- fmt.Errorf("error inspecting container: %s", err)
241 } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
242 watchdogErr <- fmt.Errorf("container is not running: State=%v", ctr.State)
249 waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
253 case waitBody := <-waitOk:
254 // wait for stdout/stderr to complete
256 return int(waitBody.StatusCode), nil
258 case err := <-waitErr:
259 return -1, fmt.Errorf("container wait: %v", err)
264 case err := <-watchdogErr:
268 e.logf("docker watchdog: %s", err)
270 if errors >= dockerWatchdogThreshold {
271 e.logf("docker watchdog: giving up")
279 func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
280 resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
287 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
292 errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
295 e.doneIO = make(chan struct{})
297 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
298 if e.errIO == nil && errStdin != nil {
306 func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
308 _, err := io.Copy(conn, stdin)
310 return fmt.Errorf("While writing to docker container on stdin: %v", err)
315 // Handle docker log protocol; see
316 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
317 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
318 header := make([]byte, 8)
321 _, err = io.ReadAtLeast(reader, header, 8)
328 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
330 _, err = io.CopyN(stdout, reader, readsize)
333 _, err = io.CopyN(stderr, reader, readsize)
337 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
342 func (e *dockerExecutor) Close() {
343 e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
346 func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
347 cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
349 cmd.Args = append(cmd.Args, "-t")
351 cmd.Args = append(cmd.Args, e.containerID)
352 cmd.Args = append(cmd.Args, injectcmd...)
356 func (e *dockerExecutor) IPAddress() (string, error) {
357 if ip, ok := e.savedIPAddress.Load().(*string); ok {
360 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
362 ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
364 return "", fmt.Errorf("cannot get docker container info: %s", err)
366 ip := ctr.NetworkSettings.IPAddress
368 // TODO: try to enable networking if it wasn't
369 // already enabled when the container was
371 return "", fmt.Errorf("container has no IP address")
373 e.savedIPAddress.Store(&ip)