]> git.arvados.org - arvados.git/blob - lib/crunchrun/docker.go
17209: Merge branch 'main' into 17209-http-forward
[arvados.git] / lib / crunchrun / docker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4 package crunchrun
5
6 import (
7         "context"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "os"
12         "os/exec"
13         "slices"
14         "strconv"
15         "strings"
16         "sync/atomic"
17         "syscall"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         dockercontainer "github.com/docker/docker/api/types/container"
22         dockerclient "github.com/docker/docker/client"
23 )
24
25 // Docker daemon won't let you set a limit less than ~10 MiB
26 const minDockerRAM = int64(16 * 1024 * 1024)
27
28 // DockerAPIVersion is the API version we use to communicate with the
29 // docker service.  The oldest OS we support is Ubuntu 18.04 (bionic)
30 // which originally shipped docker 1.17.12 / API 1.35 so there is no
31 // reason to use an older API version.  See
32 // https://dev.arvados.org/issues/15370#note-38 and
33 // https://docs.docker.com/engine/api/.
34 const DockerAPIVersion = "1.35"
35
36 // Number of consecutive "inspect container" failures before
37 // concluding Docker is unresponsive, giving up, and cancelling the
38 // container.
39 const dockerWatchdogThreshold = 5
40
41 type dockerExecutor struct {
42         containerUUID    string
43         logf             func(string, ...interface{})
44         watchdogInterval time.Duration
45         dockerclient     *dockerclient.Client
46         containerID      string
47         savedIPAddress   atomic.Value
48         doneIO           chan struct{}
49         errIO            error
50 }
51
52 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
53         // API version 1.21 corresponds to Docker 1.9, which is
54         // currently the minimum version we want to support.
55         client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, DockerAPIVersion, nil, nil)
56         if watchdogInterval < 1 {
57                 watchdogInterval = time.Minute * 2
58         }
59         return &dockerExecutor{
60                 containerUUID:    containerUUID,
61                 logf:             logf,
62                 watchdogInterval: watchdogInterval,
63                 dockerclient:     client,
64         }, err
65 }
66
67 func (e *dockerExecutor) Runtime() string {
68         v, _ := e.dockerclient.ServerVersion(context.Background())
69         info := ""
70         for _, cv := range v.Components {
71                 if info != "" {
72                         info += ", "
73                 }
74                 info += cv.Name + " " + cv.Version
75         }
76         if info == "" {
77                 info = "(unknown version)"
78         }
79         return "docker " + info
80 }
81
82 func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
83         containerClient *arvados.Client) error {
84         _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
85         if err == nil {
86                 // already loaded
87                 return nil
88         }
89
90         f, err := os.Open(imageTarballPath)
91         if err != nil {
92                 return err
93         }
94         defer f.Close()
95         resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
96         if err != nil {
97                 return fmt.Errorf("While loading container image into Docker: %v", err)
98         }
99         defer resp.Body.Close()
100         buf, _ := ioutil.ReadAll(resp.Body)
101         e.logf("loaded image: response %s", buf)
102         return nil
103 }
104
105 func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
106         e.logf("Creating Docker container")
107         cfg := dockercontainer.Config{
108                 Image:        spec.Image,
109                 Cmd:          spec.Command,
110                 WorkingDir:   spec.WorkingDir,
111                 Volumes:      map[string]struct{}{},
112                 OpenStdin:    spec.Stdin != nil,
113                 StdinOnce:    spec.Stdin != nil,
114                 AttachStdin:  spec.Stdin != nil,
115                 AttachStdout: true,
116                 AttachStderr: true,
117         }
118         if cfg.WorkingDir == "." {
119                 cfg.WorkingDir = ""
120         }
121         for k, v := range spec.Env {
122                 cfg.Env = append(cfg.Env, k+"="+v)
123         }
124         if spec.RAM > 0 && spec.RAM < minDockerRAM {
125                 spec.RAM = minDockerRAM
126         }
127         hostCfg := dockercontainer.HostConfig{
128                 LogConfig: dockercontainer.LogConfig{
129                         Type: "none",
130                 },
131                 NetworkMode: dockercontainer.NetworkMode("none"),
132                 Resources: dockercontainer.Resources{
133                         CgroupParent: spec.CgroupParent,
134                         NanoCPUs:     int64(spec.VCPUs) * 1000000000,
135                         Memory:       spec.RAM, // RAM
136                         MemorySwap:   spec.RAM, // RAM+swap
137                         KernelMemory: spec.RAM, // kernel portion
138                 },
139         }
140         if spec.GPUStack == "cuda" && spec.GPUDeviceCount > 0 {
141                 var deviceIds []string
142                 if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" {
143                         // If a resource manager such as slurm or LSF told
144                         // us to select specific devices we need to propagate that.
145                         deviceIds = strings.Split(cudaVisibleDevices, ",")
146                 }
147
148                 deviceCount := spec.GPUDeviceCount
149                 if len(deviceIds) > 0 {
150                         // Docker won't accept both non-empty
151                         // DeviceIDs and a non-zero Count
152                         //
153                         // (it turns out "Count" is a dumb fallback
154                         // that just allocates device 0, 1, 2, ...,
155                         // Count-1)
156                         deviceCount = 0
157                 }
158
159                 // Capabilities are confusing.  The driver has generic
160                 // capabilities "gpu" and "nvidia" but then there's
161                 // additional capabilities "compute" and "utility"
162                 // that are passed to nvidia-container-cli.
163                 //
164                 // "compute" means include the CUDA libraries and
165                 // "utility" means include the CUDA utility programs
166                 // (like nvidia-smi).
167                 //
168                 // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37
169                 // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go
170                 hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
171                         Driver:       "nvidia",
172                         Count:        deviceCount,
173                         DeviceIDs:    deviceIds,
174                         Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}},
175                 })
176         }
177         if spec.GPUStack == "rocm" && spec.GPUDeviceCount > 0 {
178                 // there's no container toolkit or builtin Docker
179                 // support for ROCm so we just provide the devices to
180                 // the container ourselves.
181
182                 // fortunately, the minimum version of this seems to be this:
183                 // rendergroup=$(getent group render | cut -d: -f3)
184                 // videogroup=$(getent group video | cut -d: -f3)
185                 // docker run -it --device=/dev/kfd --device=/dev/dri/renderD128 --user $(id -u) --group-add $videogroup --group-add $rendergroup "$@"
186
187                 hostCfg.Devices = append(hostCfg.Devices, dockercontainer.DeviceMapping{
188                         PathInContainer:   "/dev/kfd",
189                         PathOnHost:        "/dev/kfd",
190                         CgroupPermissions: "rwm",
191                 })
192                 info, _ := os.Stat("/dev/kfd")
193                 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
194                         // Make sure the container has access
195                         // to the group id that allow it to
196                         // access the device.
197                         hostCfg.GroupAdd = append(hostCfg.GroupAdd, fmt.Sprintf("%v", stat.Gid))
198                 }
199
200                 var deviceIndexes []int
201                 if amdVisibleDevices := os.Getenv("AMD_VISIBLE_DEVICES"); amdVisibleDevices != "" {
202                         // If a resource manager/dispatcher told us to
203                         // select specific devices, so we need to
204                         // propagate that.
205                         for _, dev := range strings.Split(amdVisibleDevices, ",") {
206                                 intDev, err := strconv.Atoi(dev)
207                                 if err != nil {
208                                         continue
209                                 }
210                                 deviceIndexes = append(deviceIndexes, intDev)
211                         }
212                 } else {
213                         // Try every device, we'll check below to see
214                         // which ones actually exists.
215                         for i := 0; i < 128; i++ {
216                                 deviceIndexes = append(deviceIndexes, i)
217                         }
218                 }
219                 for _, intDev := range deviceIndexes {
220                         devPath := fmt.Sprintf("/dev/dri/renderD%v", 128+intDev)
221                         info, err := os.Stat(devPath)
222                         if err != nil {
223                                 continue
224                         }
225                         hostCfg.Devices = append(hostCfg.Devices, dockercontainer.DeviceMapping{
226                                 PathInContainer:   devPath,
227                                 PathOnHost:        devPath,
228                                 CgroupPermissions: "rwm",
229                         })
230                         if stat, ok := info.Sys().(*syscall.Stat_t); ok {
231                                 // Make sure the container has access
232                                 // to the group id that allow it to
233                                 // access the device.
234                                 if !slices.Contains(hostCfg.GroupAdd, fmt.Sprintf("%v", stat.Gid)) {
235                                         hostCfg.GroupAdd = append(hostCfg.GroupAdd, fmt.Sprintf("%v", stat.Gid))
236                                 }
237                         }
238                 }
239         }
240
241         for path, mount := range spec.BindMounts {
242                 bind := mount.HostPath + ":" + path
243                 if mount.ReadOnly {
244                         bind += ":ro"
245                 }
246                 hostCfg.Binds = append(hostCfg.Binds, bind)
247         }
248         if spec.EnableNetwork {
249                 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
250         }
251         return cfg, hostCfg
252 }
253
254 func (e *dockerExecutor) Create(spec containerSpec) error {
255         cfg, hostCfg := e.config(spec)
256         created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, nil, e.containerUUID)
257         if err != nil {
258                 return fmt.Errorf("While creating container: %v", err)
259         }
260         e.containerID = created.ID
261         return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
262 }
263
264 func (e *dockerExecutor) Pid() int {
265         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
266         defer cancel()
267         ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
268         if err == nil && ctr.State != nil {
269                 return ctr.State.Pid
270         } else {
271                 return 0
272         }
273 }
274
275 func (e *dockerExecutor) Start() error {
276         return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockercontainer.StartOptions{})
277 }
278
279 func (e *dockerExecutor) Stop() error {
280         err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockercontainer.RemoveOptions{Force: true})
281         if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
282                 err = nil
283         }
284         return err
285 }
286
287 // Wait for the container to terminate, capture the exit code, and
288 // wait for stdout/stderr logging to finish.
289 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
290         ctx, cancel := context.WithCancel(ctx)
291         defer cancel()
292         watchdogErr := make(chan error, 1)
293         go func() {
294                 ticker := time.NewTicker(e.watchdogInterval)
295                 defer ticker.Stop()
296                 for range ticker.C {
297                         dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
298                         ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
299                         dcancel()
300                         if ctx.Err() != nil {
301                                 // Either the container already
302                                 // exited, or our caller is trying to
303                                 // kill it.
304                                 return
305                         } else if err != nil {
306                                 watchdogErr <- fmt.Errorf("error inspecting container: %s", err)
307                         } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
308                                 watchdogErr <- fmt.Errorf("container is not running: State=%v", ctr.State)
309                         } else {
310                                 watchdogErr <- nil
311                         }
312                 }
313         }()
314
315         waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
316         errors := 0
317         for {
318                 select {
319                 case waitBody := <-waitOk:
320                         // wait for stdout/stderr to complete
321                         <-e.doneIO
322                         return int(waitBody.StatusCode), nil
323
324                 case err := <-waitErr:
325                         return -1, fmt.Errorf("container wait: %v", err)
326
327                 case <-ctx.Done():
328                         return -1, ctx.Err()
329
330                 case err := <-watchdogErr:
331                         if err == nil {
332                                 errors = 0
333                         } else {
334                                 e.logf("docker watchdog: %s", err)
335                                 errors++
336                                 if errors >= dockerWatchdogThreshold {
337                                         e.logf("docker watchdog: giving up")
338                                         return -1, err
339                                 }
340                         }
341                 }
342         }
343 }
344
345 func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
346         resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockercontainer.AttachOptions{
347                 Stream: true,
348                 Stdin:  stdin != nil,
349                 Stdout: true,
350                 Stderr: true,
351         })
352         if err != nil {
353                 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
354         }
355         var errStdin error
356         if stdin != nil {
357                 go func() {
358                         errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
359                 }()
360         }
361         e.doneIO = make(chan struct{})
362         go func() {
363                 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
364                 if e.errIO == nil && errStdin != nil {
365                         e.errIO = errStdin
366                 }
367                 close(e.doneIO)
368         }()
369         return nil
370 }
371
372 func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
373         defer closeConn()
374         _, err := io.Copy(conn, stdin)
375         if err != nil {
376                 return fmt.Errorf("While writing to docker container on stdin: %v", err)
377         }
378         return nil
379 }
380
381 // Handle docker log protocol; see
382 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
383 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
384         header := make([]byte, 8)
385         var err error
386         for err == nil {
387                 _, err = io.ReadAtLeast(reader, header, 8)
388                 if err != nil {
389                         if err == io.EOF {
390                                 err = nil
391                         }
392                         break
393                 }
394                 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
395                 if header[0] == 1 {
396                         _, err = io.CopyN(stdout, reader, readsize)
397                 } else {
398                         // stderr
399                         _, err = io.CopyN(stderr, reader, readsize)
400                 }
401         }
402         if err != nil {
403                 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
404         }
405         return nil
406 }
407
408 func (e *dockerExecutor) Close() {
409         e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockercontainer.RemoveOptions{Force: true})
410 }
411
412 func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
413         cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
414         if usingTTY {
415                 cmd.Args = append(cmd.Args, "-t")
416         }
417         cmd.Args = append(cmd.Args, e.containerID)
418         cmd.Args = append(cmd.Args, injectcmd...)
419         return cmd, nil
420 }
421
422 func (e *dockerExecutor) IPAddress() (string, error) {
423         if ip, ok := e.savedIPAddress.Load().(*string); ok {
424                 return *ip, nil
425         }
426         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
427         defer cancel()
428         ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
429         if err != nil {
430                 return "", fmt.Errorf("cannot get docker container info: %s", err)
431         }
432         ip := ctr.NetworkSettings.IPAddress
433         if ip == "" {
434                 // TODO: try to enable networking if it wasn't
435                 // already enabled when the container was
436                 // created.
437                 return "", fmt.Errorf("container has no IP address")
438         }
439         e.savedIPAddress.Store(&ip)
440         return ip, nil
441 }