]> git.arvados.org - arvados.git/blob - lib/crunchrun/docker.go
Merge branch '23009-multiselect-bug' into main. Closes #23009
[arvados.git] / lib / crunchrun / docker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4 package crunchrun
5
6 import (
7         "context"
8         "encoding/json"
9         "fmt"
10         "io"
11         "os"
12         "os/exec"
13         "slices"
14         "strconv"
15         "strings"
16         "sync/atomic"
17         "syscall"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         dockercontainer "github.com/docker/docker/api/types/container"
22         dockerclient "github.com/docker/docker/client"
23         "github.com/docker/docker/pkg/jsonmessage"
24 )
25
26 // Docker daemon won't let you set a limit less than ~10 MiB
27 const minDockerRAM = int64(16 * 1024 * 1024)
28
29 // DockerAPIVersion is the API version we use to communicate with the
30 // docker service.  The oldest OS we support is Ubuntu 18.04 (bionic)
31 // which originally shipped docker 1.17.12 / API 1.35 so there is no
32 // reason to use an older API version.  See
33 // https://dev.arvados.org/issues/15370#note-38 and
34 // https://docs.docker.com/engine/api/.
35 const DockerAPIVersion = "1.35"
36
37 // Number of consecutive "inspect container" failures before
38 // concluding Docker is unresponsive, giving up, and cancelling the
39 // container.
40 const dockerWatchdogThreshold = 5
41
42 type dockerExecutor struct {
43         containerUUID    string
44         logf             func(string, ...interface{})
45         watchdogInterval time.Duration
46         dockerclient     *dockerclient.Client
47         containerID      string
48         savedIPAddress   atomic.Value
49         doneIO           chan struct{}
50         errIO            error
51 }
52
53 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
54         // API version 1.21 corresponds to Docker 1.9, which is
55         // currently the minimum version we want to support.
56         client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, DockerAPIVersion, nil, nil)
57         if watchdogInterval < 1 {
58                 watchdogInterval = time.Minute * 2
59         }
60         return &dockerExecutor{
61                 containerUUID:    containerUUID,
62                 logf:             logf,
63                 watchdogInterval: watchdogInterval,
64                 dockerclient:     client,
65         }, err
66 }
67
68 func (e *dockerExecutor) Runtime() string {
69         v, _ := e.dockerclient.ServerVersion(context.Background())
70         info := ""
71         for _, cv := range v.Components {
72                 if info != "" {
73                         info += ", "
74                 }
75                 info += cv.Name + " " + cv.Version
76         }
77         if info == "" {
78                 info = "(unknown version)"
79         }
80         return "docker " + info
81 }
82
83 func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
84         containerClient *arvados.Client) error {
85         _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
86         if err == nil {
87                 // already loaded
88                 return nil
89         }
90
91         f, err := os.Open(imageTarballPath)
92         if err != nil {
93                 return err
94         }
95         defer f.Close()
96         resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
97         if err != nil {
98                 return fmt.Errorf("ImageLoad: %w", err)
99         }
100         defer resp.Body.Close()
101         var message jsonmessage.JSONMessage
102         err = json.NewDecoder(resp.Body).Decode(&message)
103         if err != nil {
104                 return fmt.Errorf("could not parse Docker response: %w", err)
105         }
106         if message.Error != nil {
107                 return fmt.Errorf("ImageLoad: %w", message.Error)
108         }
109         // message.Stream is typically "Loaded image: hello-world:latest\n"
110         e.logf("%s", strings.TrimSuffix(message.Stream, "\n"))
111         return nil
112 }
113
114 func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
115         e.logf("Creating Docker container")
116         cfg := dockercontainer.Config{
117                 Image:        spec.Image,
118                 Cmd:          spec.Command,
119                 WorkingDir:   spec.WorkingDir,
120                 Volumes:      map[string]struct{}{},
121                 OpenStdin:    spec.Stdin != nil,
122                 StdinOnce:    spec.Stdin != nil,
123                 AttachStdin:  spec.Stdin != nil,
124                 AttachStdout: true,
125                 AttachStderr: true,
126         }
127         if cfg.WorkingDir == "." {
128                 cfg.WorkingDir = ""
129         }
130         for k, v := range spec.Env {
131                 cfg.Env = append(cfg.Env, k+"="+v)
132         }
133         if spec.RAM > 0 && spec.RAM < minDockerRAM {
134                 spec.RAM = minDockerRAM
135         }
136         hostCfg := dockercontainer.HostConfig{
137                 LogConfig: dockercontainer.LogConfig{
138                         Type: "none",
139                 },
140                 NetworkMode: dockercontainer.NetworkMode("none"),
141                 Resources: dockercontainer.Resources{
142                         CgroupParent: spec.CgroupParent,
143                         NanoCPUs:     int64(spec.VCPUs) * 1000000000,
144                         Memory:       spec.RAM, // RAM
145                         MemorySwap:   spec.RAM, // RAM+swap
146                         KernelMemory: spec.RAM, // kernel portion
147                 },
148         }
149         if spec.GPUStack == "cuda" && spec.GPUDeviceCount > 0 {
150                 var deviceIds []string
151                 if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" {
152                         // If a resource manager such as slurm or LSF told
153                         // us to select specific devices we need to propagate that.
154                         deviceIds = strings.Split(cudaVisibleDevices, ",")
155                 }
156
157                 deviceCount := spec.GPUDeviceCount
158                 if len(deviceIds) > 0 {
159                         // Docker won't accept both non-empty
160                         // DeviceIDs and a non-zero Count
161                         //
162                         // (it turns out "Count" is a dumb fallback
163                         // that just allocates device 0, 1, 2, ...,
164                         // Count-1)
165                         deviceCount = 0
166                 }
167
168                 // Capabilities are confusing.  The driver has generic
169                 // capabilities "gpu" and "nvidia" but then there's
170                 // additional capabilities "compute" and "utility"
171                 // that are passed to nvidia-container-cli.
172                 //
173                 // "compute" means include the CUDA libraries and
174                 // "utility" means include the CUDA utility programs
175                 // (like nvidia-smi).
176                 //
177                 // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37
178                 // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go
179                 hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
180                         Driver:       "nvidia",
181                         Count:        deviceCount,
182                         DeviceIDs:    deviceIds,
183                         Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}},
184                 })
185         }
186         if spec.GPUStack == "rocm" && spec.GPUDeviceCount > 0 {
187                 // there's no container toolkit or builtin Docker
188                 // support for ROCm so we just provide the devices to
189                 // the container ourselves.
190
191                 // fortunately, the minimum version of this seems to be this:
192                 // rendergroup=$(getent group render | cut -d: -f3)
193                 // videogroup=$(getent group video | cut -d: -f3)
194                 // docker run -it --device=/dev/kfd --device=/dev/dri/renderD128 --user $(id -u) --group-add $videogroup --group-add $rendergroup "$@"
195
196                 hostCfg.Devices = append(hostCfg.Devices, dockercontainer.DeviceMapping{
197                         PathInContainer:   "/dev/kfd",
198                         PathOnHost:        "/dev/kfd",
199                         CgroupPermissions: "rwm",
200                 })
201                 info, _ := os.Stat("/dev/kfd")
202                 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
203                         // Make sure the container has access
204                         // to the group id that allow it to
205                         // access the device.
206                         hostCfg.GroupAdd = append(hostCfg.GroupAdd, fmt.Sprintf("%v", stat.Gid))
207                 }
208
209                 var deviceIndexes []int
210                 if amdVisibleDevices := os.Getenv("AMD_VISIBLE_DEVICES"); amdVisibleDevices != "" {
211                         // If a resource manager/dispatcher told us to
212                         // select specific devices, so we need to
213                         // propagate that.
214                         for _, dev := range strings.Split(amdVisibleDevices, ",") {
215                                 intDev, err := strconv.Atoi(dev)
216                                 if err != nil {
217                                         continue
218                                 }
219                                 deviceIndexes = append(deviceIndexes, intDev)
220                         }
221                 } else {
222                         // Try every device, we'll check below to see
223                         // which ones actually exists.
224                         for i := 0; i < 128; i++ {
225                                 deviceIndexes = append(deviceIndexes, i)
226                         }
227                 }
228                 for _, intDev := range deviceIndexes {
229                         devPath := fmt.Sprintf("/dev/dri/renderD%v", 128+intDev)
230                         info, err := os.Stat(devPath)
231                         if err != nil {
232                                 continue
233                         }
234                         hostCfg.Devices = append(hostCfg.Devices, dockercontainer.DeviceMapping{
235                                 PathInContainer:   devPath,
236                                 PathOnHost:        devPath,
237                                 CgroupPermissions: "rwm",
238                         })
239                         if stat, ok := info.Sys().(*syscall.Stat_t); ok {
240                                 // Make sure the container has access
241                                 // to the group id that allow it to
242                                 // access the device.
243                                 if !slices.Contains(hostCfg.GroupAdd, fmt.Sprintf("%v", stat.Gid)) {
244                                         hostCfg.GroupAdd = append(hostCfg.GroupAdd, fmt.Sprintf("%v", stat.Gid))
245                                 }
246                         }
247                 }
248         }
249
250         for path, mount := range spec.BindMounts {
251                 bind := mount.HostPath + ":" + path
252                 if mount.ReadOnly {
253                         bind += ":ro"
254                 }
255                 hostCfg.Binds = append(hostCfg.Binds, bind)
256         }
257         if spec.EnableNetwork {
258                 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
259         }
260         return cfg, hostCfg
261 }
262
263 func (e *dockerExecutor) Create(spec containerSpec) error {
264         cfg, hostCfg := e.config(spec)
265         created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, nil, e.containerUUID)
266         if err != nil {
267                 return fmt.Errorf("While creating container: %v", err)
268         }
269         e.containerID = created.ID
270         return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
271 }
272
273 func (e *dockerExecutor) Pid() int {
274         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
275         defer cancel()
276         ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
277         if err == nil && ctr.State != nil {
278                 return ctr.State.Pid
279         } else {
280                 return 0
281         }
282 }
283
284 func (e *dockerExecutor) Start() error {
285         return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockercontainer.StartOptions{})
286 }
287
288 func (e *dockerExecutor) Stop() error {
289         err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockercontainer.RemoveOptions{Force: true})
290         if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
291                 err = nil
292         }
293         return err
294 }
295
296 // Wait for the container to terminate, capture the exit code, and
297 // wait for stdout/stderr logging to finish.
298 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
299         ctx, cancel := context.WithCancel(ctx)
300         defer cancel()
301         watchdogErr := make(chan error, 1)
302         go func() {
303                 ticker := time.NewTicker(e.watchdogInterval)
304                 defer ticker.Stop()
305                 for range ticker.C {
306                         dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
307                         ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
308                         dcancel()
309                         if ctx.Err() != nil {
310                                 // Either the container already
311                                 // exited, or our caller is trying to
312                                 // kill it.
313                                 return
314                         } else if err != nil {
315                                 watchdogErr <- fmt.Errorf("error inspecting container: %s", err)
316                         } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
317                                 watchdogErr <- fmt.Errorf("container is not running: State=%v", ctr.State)
318                         } else {
319                                 watchdogErr <- nil
320                         }
321                 }
322         }()
323
324         waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
325         errors := 0
326         for {
327                 select {
328                 case waitBody := <-waitOk:
329                         // wait for stdout/stderr to complete
330                         <-e.doneIO
331                         return int(waitBody.StatusCode), nil
332
333                 case err := <-waitErr:
334                         return -1, fmt.Errorf("container wait: %v", err)
335
336                 case <-ctx.Done():
337                         return -1, ctx.Err()
338
339                 case err := <-watchdogErr:
340                         if err == nil {
341                                 errors = 0
342                         } else {
343                                 e.logf("docker watchdog: %s", err)
344                                 errors++
345                                 if errors >= dockerWatchdogThreshold {
346                                         e.logf("docker watchdog: giving up")
347                                         return -1, err
348                                 }
349                         }
350                 }
351         }
352 }
353
354 func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
355         resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockercontainer.AttachOptions{
356                 Stream: true,
357                 Stdin:  stdin != nil,
358                 Stdout: true,
359                 Stderr: true,
360         })
361         if err != nil {
362                 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
363         }
364         var errStdin error
365         if stdin != nil {
366                 go func() {
367                         errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
368                 }()
369         }
370         e.doneIO = make(chan struct{})
371         go func() {
372                 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
373                 if e.errIO == nil && errStdin != nil {
374                         e.errIO = errStdin
375                 }
376                 close(e.doneIO)
377         }()
378         return nil
379 }
380
381 func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
382         defer closeConn()
383         _, err := io.Copy(conn, stdin)
384         if err != nil {
385                 return fmt.Errorf("While writing to docker container on stdin: %v", err)
386         }
387         return nil
388 }
389
390 // Handle docker log protocol; see
391 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
392 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
393         header := make([]byte, 8)
394         var err error
395         for err == nil {
396                 _, err = io.ReadAtLeast(reader, header, 8)
397                 if err != nil {
398                         if err == io.EOF {
399                                 err = nil
400                         }
401                         break
402                 }
403                 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
404                 if header[0] == 1 {
405                         _, err = io.CopyN(stdout, reader, readsize)
406                 } else {
407                         // stderr
408                         _, err = io.CopyN(stderr, reader, readsize)
409                 }
410         }
411         if err != nil {
412                 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
413         }
414         return nil
415 }
416
417 func (e *dockerExecutor) Close() {
418         e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockercontainer.RemoveOptions{Force: true})
419 }
420
421 func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
422         cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
423         if usingTTY {
424                 cmd.Args = append(cmd.Args, "-t")
425         }
426         cmd.Args = append(cmd.Args, e.containerID)
427         cmd.Args = append(cmd.Args, injectcmd...)
428         return cmd, nil
429 }
430
431 func (e *dockerExecutor) IPAddress() (string, error) {
432         if ip, ok := e.savedIPAddress.Load().(*string); ok {
433                 return *ip, nil
434         }
435         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
436         defer cancel()
437         ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
438         if err != nil {
439                 return "", fmt.Errorf("cannot get docker container info: %s", err)
440         }
441         ip := ctr.NetworkSettings.IPAddress
442         if ip == "" {
443                 // TODO: try to enable networking if it wasn't
444                 // already enabled when the container was
445                 // created.
446                 return "", fmt.Errorf("container has no IP address")
447         }
448         e.savedIPAddress.Store(&ip)
449         return ip, nil
450 }