21925: Don't pass full resource to progress bar useEffect to stop request spam
[arvados.git] / lib / crunchrun / docker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4 package crunchrun
5
6 import (
7         "context"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "os"
12         "os/exec"
13         "strings"
14         "sync/atomic"
15         "time"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         dockercontainer "github.com/docker/docker/api/types/container"
19         dockerclient "github.com/docker/docker/client"
20 )
21
22 // Docker daemon won't let you set a limit less than ~10 MiB
23 const minDockerRAM = int64(16 * 1024 * 1024)
24
25 // DockerAPIVersion is the API version we use to communicate with the
26 // docker service.  The oldest OS we support is Ubuntu 18.04 (bionic)
27 // which originally shipped docker 1.17.12 / API 1.35 so there is no
28 // reason to use an older API version.  See
29 // https://dev.arvados.org/issues/15370#note-38 and
30 // https://docs.docker.com/engine/api/.
31 const DockerAPIVersion = "1.35"
32
33 // Number of consecutive "inspect container" failures before
34 // concluding Docker is unresponsive, giving up, and cancelling the
35 // container.
36 const dockerWatchdogThreshold = 5
37
38 type dockerExecutor struct {
39         containerUUID    string
40         logf             func(string, ...interface{})
41         watchdogInterval time.Duration
42         dockerclient     *dockerclient.Client
43         containerID      string
44         savedIPAddress   atomic.Value
45         doneIO           chan struct{}
46         errIO            error
47 }
48
49 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
50         // API version 1.21 corresponds to Docker 1.9, which is
51         // currently the minimum version we want to support.
52         client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, DockerAPIVersion, nil, nil)
53         if watchdogInterval < 1 {
54                 watchdogInterval = time.Minute * 2
55         }
56         return &dockerExecutor{
57                 containerUUID:    containerUUID,
58                 logf:             logf,
59                 watchdogInterval: watchdogInterval,
60                 dockerclient:     client,
61         }, err
62 }
63
64 func (e *dockerExecutor) Runtime() string {
65         v, _ := e.dockerclient.ServerVersion(context.Background())
66         info := ""
67         for _, cv := range v.Components {
68                 if info != "" {
69                         info += ", "
70                 }
71                 info += cv.Name + " " + cv.Version
72         }
73         if info == "" {
74                 info = "(unknown version)"
75         }
76         return "docker " + info
77 }
78
79 func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
80         containerClient *arvados.Client) error {
81         _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
82         if err == nil {
83                 // already loaded
84                 return nil
85         }
86
87         f, err := os.Open(imageTarballPath)
88         if err != nil {
89                 return err
90         }
91         defer f.Close()
92         resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
93         if err != nil {
94                 return fmt.Errorf("While loading container image into Docker: %v", err)
95         }
96         defer resp.Body.Close()
97         buf, _ := ioutil.ReadAll(resp.Body)
98         e.logf("loaded image: response %s", buf)
99         return nil
100 }
101
102 func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
103         e.logf("Creating Docker container")
104         cfg := dockercontainer.Config{
105                 Image:        spec.Image,
106                 Cmd:          spec.Command,
107                 WorkingDir:   spec.WorkingDir,
108                 Volumes:      map[string]struct{}{},
109                 OpenStdin:    spec.Stdin != nil,
110                 StdinOnce:    spec.Stdin != nil,
111                 AttachStdin:  spec.Stdin != nil,
112                 AttachStdout: true,
113                 AttachStderr: true,
114         }
115         if cfg.WorkingDir == "." {
116                 cfg.WorkingDir = ""
117         }
118         for k, v := range spec.Env {
119                 cfg.Env = append(cfg.Env, k+"="+v)
120         }
121         if spec.RAM > 0 && spec.RAM < minDockerRAM {
122                 spec.RAM = minDockerRAM
123         }
124         hostCfg := dockercontainer.HostConfig{
125                 LogConfig: dockercontainer.LogConfig{
126                         Type: "none",
127                 },
128                 NetworkMode: dockercontainer.NetworkMode("none"),
129                 Resources: dockercontainer.Resources{
130                         CgroupParent: spec.CgroupParent,
131                         NanoCPUs:     int64(spec.VCPUs) * 1000000000,
132                         Memory:       spec.RAM, // RAM
133                         MemorySwap:   spec.RAM, // RAM+swap
134                         KernelMemory: spec.RAM, // kernel portion
135                 },
136         }
137         if spec.CUDADeviceCount != 0 {
138                 var deviceIds []string
139                 if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" {
140                         // If a resource manager such as slurm or LSF told
141                         // us to select specific devices we need to propagate that.
142                         deviceIds = strings.Split(cudaVisibleDevices, ",")
143                 }
144
145                 deviceCount := spec.CUDADeviceCount
146                 if len(deviceIds) > 0 {
147                         // Docker won't accept both non-empty
148                         // DeviceIDs and a non-zero Count
149                         //
150                         // (it turns out "Count" is a dumb fallback
151                         // that just allocates device 0, 1, 2, ...,
152                         // Count-1)
153                         deviceCount = 0
154                 }
155
156                 // Capabilities are confusing.  The driver has generic
157                 // capabilities "gpu" and "nvidia" but then there's
158                 // additional capabilities "compute" and "utility"
159                 // that are passed to nvidia-container-cli.
160                 //
161                 // "compute" means include the CUDA libraries and
162                 // "utility" means include the CUDA utility programs
163                 // (like nvidia-smi).
164                 //
165                 // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37
166                 // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go
167                 hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
168                         Driver:       "nvidia",
169                         Count:        deviceCount,
170                         DeviceIDs:    deviceIds,
171                         Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}},
172                 })
173         }
174         for path, mount := range spec.BindMounts {
175                 bind := mount.HostPath + ":" + path
176                 if mount.ReadOnly {
177                         bind += ":ro"
178                 }
179                 hostCfg.Binds = append(hostCfg.Binds, bind)
180         }
181         if spec.EnableNetwork {
182                 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
183         }
184         return cfg, hostCfg
185 }
186
187 func (e *dockerExecutor) Create(spec containerSpec) error {
188         cfg, hostCfg := e.config(spec)
189         created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, nil, e.containerUUID)
190         if err != nil {
191                 return fmt.Errorf("While creating container: %v", err)
192         }
193         e.containerID = created.ID
194         return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
195 }
196
197 func (e *dockerExecutor) Pid() int {
198         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
199         defer cancel()
200         ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
201         if err == nil && ctr.State != nil {
202                 return ctr.State.Pid
203         } else {
204                 return 0
205         }
206 }
207
208 func (e *dockerExecutor) Start() error {
209         return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockercontainer.StartOptions{})
210 }
211
212 func (e *dockerExecutor) Stop() error {
213         err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockercontainer.RemoveOptions{Force: true})
214         if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
215                 err = nil
216         }
217         return err
218 }
219
220 // Wait for the container to terminate, capture the exit code, and
221 // wait for stdout/stderr logging to finish.
222 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
223         ctx, cancel := context.WithCancel(ctx)
224         defer cancel()
225         watchdogErr := make(chan error, 1)
226         go func() {
227                 ticker := time.NewTicker(e.watchdogInterval)
228                 defer ticker.Stop()
229                 for range ticker.C {
230                         dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
231                         ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
232                         dcancel()
233                         if ctx.Err() != nil {
234                                 // Either the container already
235                                 // exited, or our caller is trying to
236                                 // kill it.
237                                 return
238                         } else if err != nil {
239                                 watchdogErr <- fmt.Errorf("error inspecting container: %s", err)
240                         } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
241                                 watchdogErr <- fmt.Errorf("container is not running: State=%v", ctr.State)
242                         } else {
243                                 watchdogErr <- nil
244                         }
245                 }
246         }()
247
248         waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
249         errors := 0
250         for {
251                 select {
252                 case waitBody := <-waitOk:
253                         // wait for stdout/stderr to complete
254                         <-e.doneIO
255                         return int(waitBody.StatusCode), nil
256
257                 case err := <-waitErr:
258                         return -1, fmt.Errorf("container wait: %v", err)
259
260                 case <-ctx.Done():
261                         return -1, ctx.Err()
262
263                 case err := <-watchdogErr:
264                         if err == nil {
265                                 errors = 0
266                         } else {
267                                 e.logf("docker watchdog: %s", err)
268                                 errors++
269                                 if errors >= dockerWatchdogThreshold {
270                                         e.logf("docker watchdog: giving up")
271                                         return -1, err
272                                 }
273                         }
274                 }
275         }
276 }
277
278 func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
279         resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockercontainer.AttachOptions{
280                 Stream: true,
281                 Stdin:  stdin != nil,
282                 Stdout: true,
283                 Stderr: true,
284         })
285         if err != nil {
286                 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
287         }
288         var errStdin error
289         if stdin != nil {
290                 go func() {
291                         errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
292                 }()
293         }
294         e.doneIO = make(chan struct{})
295         go func() {
296                 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
297                 if e.errIO == nil && errStdin != nil {
298                         e.errIO = errStdin
299                 }
300                 close(e.doneIO)
301         }()
302         return nil
303 }
304
305 func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
306         defer closeConn()
307         _, err := io.Copy(conn, stdin)
308         if err != nil {
309                 return fmt.Errorf("While writing to docker container on stdin: %v", err)
310         }
311         return nil
312 }
313
314 // Handle docker log protocol; see
315 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
316 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
317         header := make([]byte, 8)
318         var err error
319         for err == nil {
320                 _, err = io.ReadAtLeast(reader, header, 8)
321                 if err != nil {
322                         if err == io.EOF {
323                                 err = nil
324                         }
325                         break
326                 }
327                 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
328                 if header[0] == 1 {
329                         _, err = io.CopyN(stdout, reader, readsize)
330                 } else {
331                         // stderr
332                         _, err = io.CopyN(stderr, reader, readsize)
333                 }
334         }
335         if err != nil {
336                 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
337         }
338         return nil
339 }
340
341 func (e *dockerExecutor) Close() {
342         e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockercontainer.RemoveOptions{Force: true})
343 }
344
345 func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
346         cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
347         if usingTTY {
348                 cmd.Args = append(cmd.Args, "-t")
349         }
350         cmd.Args = append(cmd.Args, e.containerID)
351         cmd.Args = append(cmd.Args, injectcmd...)
352         return cmd, nil
353 }
354
355 func (e *dockerExecutor) IPAddress() (string, error) {
356         if ip, ok := e.savedIPAddress.Load().(*string); ok {
357                 return *ip, nil
358         }
359         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
360         defer cancel()
361         ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
362         if err != nil {
363                 return "", fmt.Errorf("cannot get docker container info: %s", err)
364         }
365         ip := ctr.NetworkSettings.IPAddress
366         if ip == "" {
367                 // TODO: try to enable networking if it wasn't
368                 // already enabled when the container was
369                 // created.
370                 return "", fmt.Errorf("container has no IP address")
371         }
372         e.savedIPAddress.Store(&ip)
373         return ip, nil
374 }