Merge branch '19744-acr-crunchstat' refs #19744
[arvados.git] / lib / crunchrun / docker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4 package crunchrun
5
6 import (
7         "context"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "os"
12         "os/exec"
13         "strings"
14         "sync/atomic"
15         "time"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         dockertypes "github.com/docker/docker/api/types"
19         dockercontainer "github.com/docker/docker/api/types/container"
20         dockerclient "github.com/docker/docker/client"
21 )
22
23 // Docker daemon won't let you set a limit less than ~10 MiB
24 const minDockerRAM = int64(16 * 1024 * 1024)
25
26 // DockerAPIVersion is the API version we use to communicate with the
27 // docker service.  The oldest OS we support is Ubuntu 18.04 (bionic)
28 // which originally shipped docker 1.17.12 / API 1.35 so there is no
29 // reason to use an older API version.  See
30 // https://dev.arvados.org/issues/15370#note-38 and
31 // https://docs.docker.com/engine/api/.
32 const DockerAPIVersion = "1.35"
33
34 // Number of consecutive "inspect container" failures before
35 // concluding Docker is unresponsive, giving up, and cancelling the
36 // container.
37 const dockerWatchdogThreshold = 5
38
39 type dockerExecutor struct {
40         containerUUID    string
41         logf             func(string, ...interface{})
42         watchdogInterval time.Duration
43         dockerclient     *dockerclient.Client
44         containerID      string
45         savedIPAddress   atomic.Value
46         doneIO           chan struct{}
47         errIO            error
48 }
49
50 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
51         // API version 1.21 corresponds to Docker 1.9, which is
52         // currently the minimum version we want to support.
53         client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, DockerAPIVersion, nil, nil)
54         if watchdogInterval < 1 {
55                 watchdogInterval = time.Minute * 2
56         }
57         return &dockerExecutor{
58                 containerUUID:    containerUUID,
59                 logf:             logf,
60                 watchdogInterval: watchdogInterval,
61                 dockerclient:     client,
62         }, err
63 }
64
65 func (e *dockerExecutor) Runtime() string {
66         v, _ := e.dockerclient.ServerVersion(context.Background())
67         info := ""
68         for _, cv := range v.Components {
69                 if info != "" {
70                         info += ", "
71                 }
72                 info += cv.Name + " " + cv.Version
73         }
74         if info == "" {
75                 info = "(unknown version)"
76         }
77         return "docker " + info
78 }
79
80 func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
81         containerClient *arvados.Client) error {
82         _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
83         if err == nil {
84                 // already loaded
85                 return nil
86         }
87
88         f, err := os.Open(imageTarballPath)
89         if err != nil {
90                 return err
91         }
92         defer f.Close()
93         resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
94         if err != nil {
95                 return fmt.Errorf("While loading container image into Docker: %v", err)
96         }
97         defer resp.Body.Close()
98         buf, _ := ioutil.ReadAll(resp.Body)
99         e.logf("loaded image: response %s", buf)
100         return nil
101 }
102
103 func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
104         e.logf("Creating Docker container")
105         cfg := dockercontainer.Config{
106                 Image:        spec.Image,
107                 Cmd:          spec.Command,
108                 WorkingDir:   spec.WorkingDir,
109                 Volumes:      map[string]struct{}{},
110                 OpenStdin:    spec.Stdin != nil,
111                 StdinOnce:    spec.Stdin != nil,
112                 AttachStdin:  spec.Stdin != nil,
113                 AttachStdout: true,
114                 AttachStderr: true,
115         }
116         if cfg.WorkingDir == "." {
117                 cfg.WorkingDir = ""
118         }
119         for k, v := range spec.Env {
120                 cfg.Env = append(cfg.Env, k+"="+v)
121         }
122         if spec.RAM > 0 && spec.RAM < minDockerRAM {
123                 spec.RAM = minDockerRAM
124         }
125         hostCfg := dockercontainer.HostConfig{
126                 LogConfig: dockercontainer.LogConfig{
127                         Type: "none",
128                 },
129                 NetworkMode: dockercontainer.NetworkMode("none"),
130                 Resources: dockercontainer.Resources{
131                         CgroupParent: spec.CgroupParent,
132                         NanoCPUs:     int64(spec.VCPUs) * 1000000000,
133                         Memory:       spec.RAM, // RAM
134                         MemorySwap:   spec.RAM, // RAM+swap
135                         KernelMemory: spec.RAM, // kernel portion
136                 },
137         }
138         if spec.CUDADeviceCount != 0 {
139                 var deviceIds []string
140                 if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" {
141                         // If a resource manager such as slurm or LSF told
142                         // us to select specific devices we need to propagate that.
143                         deviceIds = strings.Split(cudaVisibleDevices, ",")
144                 }
145
146                 deviceCount := spec.CUDADeviceCount
147                 if len(deviceIds) > 0 {
148                         // Docker won't accept both non-empty
149                         // DeviceIDs and a non-zero Count
150                         //
151                         // (it turns out "Count" is a dumb fallback
152                         // that just allocates device 0, 1, 2, ...,
153                         // Count-1)
154                         deviceCount = 0
155                 }
156
157                 // Capabilities are confusing.  The driver has generic
158                 // capabilities "gpu" and "nvidia" but then there's
159                 // additional capabilities "compute" and "utility"
160                 // that are passed to nvidia-container-cli.
161                 //
162                 // "compute" means include the CUDA libraries and
163                 // "utility" means include the CUDA utility programs
164                 // (like nvidia-smi).
165                 //
166                 // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37
167                 // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go
168                 hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
169                         Driver:       "nvidia",
170                         Count:        deviceCount,
171                         DeviceIDs:    deviceIds,
172                         Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}},
173                 })
174         }
175         for path, mount := range spec.BindMounts {
176                 bind := mount.HostPath + ":" + path
177                 if mount.ReadOnly {
178                         bind += ":ro"
179                 }
180                 hostCfg.Binds = append(hostCfg.Binds, bind)
181         }
182         if spec.EnableNetwork {
183                 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
184         }
185         return cfg, hostCfg
186 }
187
188 func (e *dockerExecutor) Create(spec containerSpec) error {
189         cfg, hostCfg := e.config(spec)
190         created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, nil, e.containerUUID)
191         if err != nil {
192                 return fmt.Errorf("While creating container: %v", err)
193         }
194         e.containerID = created.ID
195         return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
196 }
197
198 func (e *dockerExecutor) Pid() int {
199         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
200         defer cancel()
201         ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
202         if err == nil && ctr.State != nil {
203                 return ctr.State.Pid
204         } else {
205                 return 0
206         }
207 }
208
209 func (e *dockerExecutor) Start() error {
210         return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockertypes.ContainerStartOptions{})
211 }
212
213 func (e *dockerExecutor) Stop() error {
214         err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
215         if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
216                 err = nil
217         }
218         return err
219 }
220
221 // Wait for the container to terminate, capture the exit code, and
222 // wait for stdout/stderr logging to finish.
223 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
224         ctx, cancel := context.WithCancel(ctx)
225         defer cancel()
226         watchdogErr := make(chan error, 1)
227         go func() {
228                 ticker := time.NewTicker(e.watchdogInterval)
229                 defer ticker.Stop()
230                 for range ticker.C {
231                         dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
232                         ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
233                         dcancel()
234                         if ctx.Err() != nil {
235                                 // Either the container already
236                                 // exited, or our caller is trying to
237                                 // kill it.
238                                 return
239                         } else if err != nil {
240                                 watchdogErr <- fmt.Errorf("error inspecting container: %s", err)
241                         } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
242                                 watchdogErr <- fmt.Errorf("container is not running: State=%v", ctr.State)
243                         } else {
244                                 watchdogErr <- nil
245                         }
246                 }
247         }()
248
249         waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
250         errors := 0
251         for {
252                 select {
253                 case waitBody := <-waitOk:
254                         // wait for stdout/stderr to complete
255                         <-e.doneIO
256                         return int(waitBody.StatusCode), nil
257
258                 case err := <-waitErr:
259                         return -1, fmt.Errorf("container wait: %v", err)
260
261                 case <-ctx.Done():
262                         return -1, ctx.Err()
263
264                 case err := <-watchdogErr:
265                         if err == nil {
266                                 errors = 0
267                         } else {
268                                 e.logf("docker watchdog: %s", err)
269                                 errors++
270                                 if errors >= dockerWatchdogThreshold {
271                                         e.logf("docker watchdog: giving up")
272                                         return -1, err
273                                 }
274                         }
275                 }
276         }
277 }
278
279 func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
280         resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
281                 Stream: true,
282                 Stdin:  stdin != nil,
283                 Stdout: true,
284                 Stderr: true,
285         })
286         if err != nil {
287                 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
288         }
289         var errStdin error
290         if stdin != nil {
291                 go func() {
292                         errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
293                 }()
294         }
295         e.doneIO = make(chan struct{})
296         go func() {
297                 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
298                 if e.errIO == nil && errStdin != nil {
299                         e.errIO = errStdin
300                 }
301                 close(e.doneIO)
302         }()
303         return nil
304 }
305
306 func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
307         defer closeConn()
308         _, err := io.Copy(conn, stdin)
309         if err != nil {
310                 return fmt.Errorf("While writing to docker container on stdin: %v", err)
311         }
312         return nil
313 }
314
315 // Handle docker log protocol; see
316 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
317 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
318         header := make([]byte, 8)
319         var err error
320         for err == nil {
321                 _, err = io.ReadAtLeast(reader, header, 8)
322                 if err != nil {
323                         if err == io.EOF {
324                                 err = nil
325                         }
326                         break
327                 }
328                 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
329                 if header[0] == 1 {
330                         _, err = io.CopyN(stdout, reader, readsize)
331                 } else {
332                         // stderr
333                         _, err = io.CopyN(stderr, reader, readsize)
334                 }
335         }
336         if err != nil {
337                 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
338         }
339         return nil
340 }
341
342 func (e *dockerExecutor) Close() {
343         e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
344 }
345
346 func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
347         cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
348         if usingTTY {
349                 cmd.Args = append(cmd.Args, "-t")
350         }
351         cmd.Args = append(cmd.Args, e.containerID)
352         cmd.Args = append(cmd.Args, injectcmd...)
353         return cmd, nil
354 }
355
356 func (e *dockerExecutor) IPAddress() (string, error) {
357         if ip, ok := e.savedIPAddress.Load().(*string); ok {
358                 return *ip, nil
359         }
360         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
361         defer cancel()
362         ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
363         if err != nil {
364                 return "", fmt.Errorf("cannot get docker container info: %s", err)
365         }
366         ip := ctr.NetworkSettings.IPAddress
367         if ip == "" {
368                 // TODO: try to enable networking if it wasn't
369                 // already enabled when the container was
370                 // created.
371                 return "", fmt.Errorf("container has no IP address")
372         }
373         e.savedIPAddress.Store(&ip)
374         return ip, nil
375 }