Merge branch '19179-acct-activity' refs #19179
[arvados.git] / lib / crunchrun / docker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4 package crunchrun
5
6 import (
7         "fmt"
8         "io"
9         "io/ioutil"
10         "os"
11         "os/exec"
12         "strings"
13         "sync/atomic"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         dockertypes "github.com/docker/docker/api/types"
18         dockercontainer "github.com/docker/docker/api/types/container"
19         dockerclient "github.com/docker/docker/client"
20         "golang.org/x/net/context"
21 )
22
23 // Docker daemon won't let you set a limit less than ~10 MiB
24 const minDockerRAM = int64(16 * 1024 * 1024)
25
26 // DockerAPIVersion is the API version we use to communicate with the
27 // docker service.  The oldest OS we support is Ubuntu 18.04 (bionic)
28 // which originally shipped docker 1.17.12 / API 1.35 so there is no
29 // reason to use an older API version.  See
30 // https://dev.arvados.org/issues/15370#note-38 and
31 // https://docs.docker.com/engine/api/.
32 const DockerAPIVersion = "1.35"
33
34 type dockerExecutor struct {
35         containerUUID    string
36         logf             func(string, ...interface{})
37         watchdogInterval time.Duration
38         dockerclient     *dockerclient.Client
39         containerID      string
40         savedIPAddress   atomic.Value
41         doneIO           chan struct{}
42         errIO            error
43 }
44
45 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
46         // API version 1.21 corresponds to Docker 1.9, which is
47         // currently the minimum version we want to support.
48         client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, DockerAPIVersion, nil, nil)
49         if watchdogInterval < 1 {
50                 watchdogInterval = time.Minute
51         }
52         return &dockerExecutor{
53                 containerUUID:    containerUUID,
54                 logf:             logf,
55                 watchdogInterval: watchdogInterval,
56                 dockerclient:     client,
57         }, err
58 }
59
60 func (e *dockerExecutor) Runtime() string {
61         v, _ := e.dockerclient.ServerVersion(context.Background())
62         info := ""
63         for _, cv := range v.Components {
64                 if info != "" {
65                         info += ", "
66                 }
67                 info += cv.Name + " " + cv.Version
68         }
69         if info == "" {
70                 info = "(unknown version)"
71         }
72         return "docker " + info
73 }
74
75 func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
76         containerClient *arvados.Client) error {
77         _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
78         if err == nil {
79                 // already loaded
80                 return nil
81         }
82
83         f, err := os.Open(imageTarballPath)
84         if err != nil {
85                 return err
86         }
87         defer f.Close()
88         resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
89         if err != nil {
90                 return fmt.Errorf("While loading container image into Docker: %v", err)
91         }
92         defer resp.Body.Close()
93         buf, _ := ioutil.ReadAll(resp.Body)
94         e.logf("loaded image: response %s", buf)
95         return nil
96 }
97
98 func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
99         e.logf("Creating Docker container")
100         cfg := dockercontainer.Config{
101                 Image:        spec.Image,
102                 Cmd:          spec.Command,
103                 WorkingDir:   spec.WorkingDir,
104                 Volumes:      map[string]struct{}{},
105                 OpenStdin:    spec.Stdin != nil,
106                 StdinOnce:    spec.Stdin != nil,
107                 AttachStdin:  spec.Stdin != nil,
108                 AttachStdout: true,
109                 AttachStderr: true,
110         }
111         if cfg.WorkingDir == "." {
112                 cfg.WorkingDir = ""
113         }
114         for k, v := range spec.Env {
115                 cfg.Env = append(cfg.Env, k+"="+v)
116         }
117         if spec.RAM > 0 && spec.RAM < minDockerRAM {
118                 spec.RAM = minDockerRAM
119         }
120         hostCfg := dockercontainer.HostConfig{
121                 LogConfig: dockercontainer.LogConfig{
122                         Type: "none",
123                 },
124                 NetworkMode: dockercontainer.NetworkMode("none"),
125                 Resources: dockercontainer.Resources{
126                         CgroupParent: spec.CgroupParent,
127                         NanoCPUs:     int64(spec.VCPUs) * 1000000000,
128                         Memory:       spec.RAM, // RAM
129                         MemorySwap:   spec.RAM, // RAM+swap
130                         KernelMemory: spec.RAM, // kernel portion
131                 },
132         }
133         if spec.CUDADeviceCount != 0 {
134                 var deviceIds []string
135                 if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" {
136                         // If a resource manager such as slurm or LSF told
137                         // us to select specific devices we need to propagate that.
138                         deviceIds = strings.Split(cudaVisibleDevices, ",")
139                 }
140
141                 deviceCount := spec.CUDADeviceCount
142                 if len(deviceIds) > 0 {
143                         // Docker won't accept both non-empty
144                         // DeviceIDs and a non-zero Count
145                         //
146                         // (it turns out "Count" is a dumb fallback
147                         // that just allocates device 0, 1, 2, ...,
148                         // Count-1)
149                         deviceCount = 0
150                 }
151
152                 // Capabilities are confusing.  The driver has generic
153                 // capabilities "gpu" and "nvidia" but then there's
154                 // additional capabilities "compute" and "utility"
155                 // that are passed to nvidia-container-cli.
156                 //
157                 // "compute" means include the CUDA libraries and
158                 // "utility" means include the CUDA utility programs
159                 // (like nvidia-smi).
160                 //
161                 // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37
162                 // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go
163                 hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
164                         Driver:       "nvidia",
165                         Count:        deviceCount,
166                         DeviceIDs:    deviceIds,
167                         Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}},
168                 })
169         }
170         for path, mount := range spec.BindMounts {
171                 bind := mount.HostPath + ":" + path
172                 if mount.ReadOnly {
173                         bind += ":ro"
174                 }
175                 hostCfg.Binds = append(hostCfg.Binds, bind)
176         }
177         if spec.EnableNetwork {
178                 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
179         }
180         return cfg, hostCfg
181 }
182
183 func (e *dockerExecutor) Create(spec containerSpec) error {
184         cfg, hostCfg := e.config(spec)
185         created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, e.containerUUID)
186         if err != nil {
187                 return fmt.Errorf("While creating container: %v", err)
188         }
189         e.containerID = created.ID
190         return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
191 }
192
193 func (e *dockerExecutor) CgroupID() string {
194         return e.containerID
195 }
196
197 func (e *dockerExecutor) Start() error {
198         return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockertypes.ContainerStartOptions{})
199 }
200
201 func (e *dockerExecutor) Stop() error {
202         err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
203         if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
204                 err = nil
205         }
206         return err
207 }
208
209 // Wait for the container to terminate, capture the exit code, and
210 // wait for stdout/stderr logging to finish.
211 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
212         ctx, cancel := context.WithCancel(ctx)
213         defer cancel()
214         watchdogErr := make(chan error, 1)
215         go func() {
216                 ticker := time.NewTicker(e.watchdogInterval)
217                 defer ticker.Stop()
218                 for range ticker.C {
219                         dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
220                         ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
221                         dcancel()
222                         if ctx.Err() != nil {
223                                 // Either the container already
224                                 // exited, or our caller is trying to
225                                 // kill it.
226                                 return
227                         } else if err != nil {
228                                 e.logf("Error inspecting container: %s", err)
229                                 watchdogErr <- err
230                                 return
231                         } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
232                                 watchdogErr <- fmt.Errorf("Container is not running: State=%v", ctr.State)
233                                 return
234                         }
235                 }
236         }()
237
238         waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
239         for {
240                 select {
241                 case waitBody := <-waitOk:
242                         // wait for stdout/stderr to complete
243                         <-e.doneIO
244                         return int(waitBody.StatusCode), nil
245
246                 case err := <-waitErr:
247                         return -1, fmt.Errorf("container wait: %v", err)
248
249                 case <-ctx.Done():
250                         return -1, ctx.Err()
251
252                 case err := <-watchdogErr:
253                         return -1, err
254                 }
255         }
256 }
257
258 func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
259         resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
260                 Stream: true,
261                 Stdin:  stdin != nil,
262                 Stdout: true,
263                 Stderr: true,
264         })
265         if err != nil {
266                 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
267         }
268         var errStdin error
269         if stdin != nil {
270                 go func() {
271                         errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
272                 }()
273         }
274         e.doneIO = make(chan struct{})
275         go func() {
276                 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
277                 if e.errIO == nil && errStdin != nil {
278                         e.errIO = errStdin
279                 }
280                 close(e.doneIO)
281         }()
282         return nil
283 }
284
285 func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
286         defer closeConn()
287         _, err := io.Copy(conn, stdin)
288         if err != nil {
289                 return fmt.Errorf("While writing to docker container on stdin: %v", err)
290         }
291         return nil
292 }
293
294 // Handle docker log protocol; see
295 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
296 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
297         header := make([]byte, 8)
298         var err error
299         for err == nil {
300                 _, err = io.ReadAtLeast(reader, header, 8)
301                 if err != nil {
302                         if err == io.EOF {
303                                 err = nil
304                         }
305                         break
306                 }
307                 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
308                 if header[0] == 1 {
309                         _, err = io.CopyN(stdout, reader, readsize)
310                 } else {
311                         // stderr
312                         _, err = io.CopyN(stderr, reader, readsize)
313                 }
314         }
315         if err != nil {
316                 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
317         }
318         return nil
319 }
320
321 func (e *dockerExecutor) Close() {
322         e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
323 }
324
325 func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
326         cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
327         if usingTTY {
328                 cmd.Args = append(cmd.Args, "-t")
329         }
330         cmd.Args = append(cmd.Args, e.containerID)
331         cmd.Args = append(cmd.Args, injectcmd...)
332         return cmd, nil
333 }
334
335 func (e *dockerExecutor) IPAddress() (string, error) {
336         if ip, ok := e.savedIPAddress.Load().(*string); ok {
337                 return *ip, nil
338         }
339         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
340         defer cancel()
341         ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
342         if err != nil {
343                 return "", fmt.Errorf("cannot get docker container info: %s", err)
344         }
345         ip := ctr.NetworkSettings.IPAddress
346         if ip == "" {
347                 // TODO: try to enable networking if it wasn't
348                 // already enabled when the container was
349                 // created.
350                 return "", fmt.Errorf("container has no IP address")
351         }
352         e.savedIPAddress.Store(&ip)
353         return ip, nil
354 }