Merge branch '19129-wb1-dashboard-speedup' refs #19129
[arvados.git] / lib / crunchrun / docker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4 package crunchrun
5
6 import (
7         "fmt"
8         "io"
9         "io/ioutil"
10         "os"
11         "os/exec"
12         "strings"
13         "sync/atomic"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         dockertypes "github.com/docker/docker/api/types"
18         dockercontainer "github.com/docker/docker/api/types/container"
19         dockerclient "github.com/docker/docker/client"
20         "golang.org/x/net/context"
21 )
22
23 // Docker daemon won't let you set a limit less than ~10 MiB
24 const minDockerRAM = int64(16 * 1024 * 1024)
25
26 type dockerExecutor struct {
27         containerUUID    string
28         logf             func(string, ...interface{})
29         watchdogInterval time.Duration
30         dockerclient     *dockerclient.Client
31         containerID      string
32         savedIPAddress   atomic.Value
33         doneIO           chan struct{}
34         errIO            error
35 }
36
37 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
38         // API version 1.21 corresponds to Docker 1.9, which is
39         // currently the minimum version we want to support.
40         client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
41         if watchdogInterval < 1 {
42                 watchdogInterval = time.Minute
43         }
44         return &dockerExecutor{
45                 containerUUID:    containerUUID,
46                 logf:             logf,
47                 watchdogInterval: watchdogInterval,
48                 dockerclient:     client,
49         }, err
50 }
51
52 func (e *dockerExecutor) Runtime() string {
53         v, _ := e.dockerclient.ServerVersion(context.Background())
54         info := ""
55         for _, cv := range v.Components {
56                 if info != "" {
57                         info += ", "
58                 }
59                 info += cv.Name + " " + cv.Version
60         }
61         if info == "" {
62                 info = "(unknown version)"
63         }
64         return "docker " + info
65 }
66
67 func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
68         containerClient *arvados.Client) error {
69         _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
70         if err == nil {
71                 // already loaded
72                 return nil
73         }
74
75         f, err := os.Open(imageTarballPath)
76         if err != nil {
77                 return err
78         }
79         defer f.Close()
80         resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
81         if err != nil {
82                 return fmt.Errorf("While loading container image into Docker: %v", err)
83         }
84         defer resp.Body.Close()
85         buf, _ := ioutil.ReadAll(resp.Body)
86         e.logf("loaded image: response %s", buf)
87         return nil
88 }
89
90 func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
91         e.logf("Creating Docker container")
92         cfg := dockercontainer.Config{
93                 Image:        spec.Image,
94                 Cmd:          spec.Command,
95                 WorkingDir:   spec.WorkingDir,
96                 Volumes:      map[string]struct{}{},
97                 OpenStdin:    spec.Stdin != nil,
98                 StdinOnce:    spec.Stdin != nil,
99                 AttachStdin:  spec.Stdin != nil,
100                 AttachStdout: true,
101                 AttachStderr: true,
102         }
103         if cfg.WorkingDir == "." {
104                 cfg.WorkingDir = ""
105         }
106         for k, v := range spec.Env {
107                 cfg.Env = append(cfg.Env, k+"="+v)
108         }
109         if spec.RAM > 0 && spec.RAM < minDockerRAM {
110                 spec.RAM = minDockerRAM
111         }
112         hostCfg := dockercontainer.HostConfig{
113                 LogConfig: dockercontainer.LogConfig{
114                         Type: "none",
115                 },
116                 NetworkMode: dockercontainer.NetworkMode("none"),
117                 Resources: dockercontainer.Resources{
118                         CgroupParent: spec.CgroupParent,
119                         NanoCPUs:     int64(spec.VCPUs) * 1000000000,
120                         Memory:       spec.RAM, // RAM
121                         MemorySwap:   spec.RAM, // RAM+swap
122                         KernelMemory: spec.RAM, // kernel portion
123                 },
124         }
125         if spec.CUDADeviceCount != 0 {
126                 var deviceIds []string
127                 if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" {
128                         // If a resource manager such as slurm or LSF told
129                         // us to select specific devices we need to propagate that.
130                         deviceIds = strings.Split(cudaVisibleDevices, ",")
131                 }
132
133                 deviceCount := spec.CUDADeviceCount
134                 if len(deviceIds) > 0 {
135                         // Docker won't accept both non-empty
136                         // DeviceIDs and a non-zero Count
137                         //
138                         // (it turns out "Count" is a dumb fallback
139                         // that just allocates device 0, 1, 2, ...,
140                         // Count-1)
141                         deviceCount = 0
142                 }
143
144                 // Capabilities are confusing.  The driver has generic
145                 // capabilities "gpu" and "nvidia" but then there's
146                 // additional capabilities "compute" and "utility"
147                 // that are passed to nvidia-container-cli.
148                 //
149                 // "compute" means include the CUDA libraries and
150                 // "utility" means include the CUDA utility programs
151                 // (like nvidia-smi).
152                 //
153                 // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37
154                 // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go
155                 hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
156                         Driver:       "nvidia",
157                         Count:        deviceCount,
158                         DeviceIDs:    deviceIds,
159                         Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}},
160                 })
161         }
162         for path, mount := range spec.BindMounts {
163                 bind := mount.HostPath + ":" + path
164                 if mount.ReadOnly {
165                         bind += ":ro"
166                 }
167                 hostCfg.Binds = append(hostCfg.Binds, bind)
168         }
169         if spec.EnableNetwork {
170                 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
171         }
172         return cfg, hostCfg
173 }
174
175 func (e *dockerExecutor) Create(spec containerSpec) error {
176         cfg, hostCfg := e.config(spec)
177         created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, e.containerUUID)
178         if err != nil {
179                 return fmt.Errorf("While creating container: %v", err)
180         }
181         e.containerID = created.ID
182         return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
183 }
184
185 func (e *dockerExecutor) CgroupID() string {
186         return e.containerID
187 }
188
189 func (e *dockerExecutor) Start() error {
190         return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockertypes.ContainerStartOptions{})
191 }
192
193 func (e *dockerExecutor) Stop() error {
194         err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
195         if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
196                 err = nil
197         }
198         return err
199 }
200
201 // Wait for the container to terminate, capture the exit code, and
202 // wait for stdout/stderr logging to finish.
203 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
204         ctx, cancel := context.WithCancel(ctx)
205         defer cancel()
206         watchdogErr := make(chan error, 1)
207         go func() {
208                 ticker := time.NewTicker(e.watchdogInterval)
209                 defer ticker.Stop()
210                 for range ticker.C {
211                         dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
212                         ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
213                         dcancel()
214                         if ctx.Err() != nil {
215                                 // Either the container already
216                                 // exited, or our caller is trying to
217                                 // kill it.
218                                 return
219                         } else if err != nil {
220                                 e.logf("Error inspecting container: %s", err)
221                                 watchdogErr <- err
222                                 return
223                         } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
224                                 watchdogErr <- fmt.Errorf("Container is not running: State=%v", ctr.State)
225                                 return
226                         }
227                 }
228         }()
229
230         waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
231         for {
232                 select {
233                 case waitBody := <-waitOk:
234                         // wait for stdout/stderr to complete
235                         <-e.doneIO
236                         return int(waitBody.StatusCode), nil
237
238                 case err := <-waitErr:
239                         return -1, fmt.Errorf("container wait: %v", err)
240
241                 case <-ctx.Done():
242                         return -1, ctx.Err()
243
244                 case err := <-watchdogErr:
245                         return -1, err
246                 }
247         }
248 }
249
250 func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
251         resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
252                 Stream: true,
253                 Stdin:  stdin != nil,
254                 Stdout: true,
255                 Stderr: true,
256         })
257         if err != nil {
258                 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
259         }
260         var errStdin error
261         if stdin != nil {
262                 go func() {
263                         errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
264                 }()
265         }
266         e.doneIO = make(chan struct{})
267         go func() {
268                 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
269                 if e.errIO == nil && errStdin != nil {
270                         e.errIO = errStdin
271                 }
272                 close(e.doneIO)
273         }()
274         return nil
275 }
276
277 func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
278         defer closeConn()
279         _, err := io.Copy(conn, stdin)
280         if err != nil {
281                 return fmt.Errorf("While writing to docker container on stdin: %v", err)
282         }
283         return nil
284 }
285
286 // Handle docker log protocol; see
287 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
288 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
289         header := make([]byte, 8)
290         var err error
291         for err == nil {
292                 _, err = io.ReadAtLeast(reader, header, 8)
293                 if err != nil {
294                         if err == io.EOF {
295                                 err = nil
296                         }
297                         break
298                 }
299                 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
300                 if header[0] == 1 {
301                         _, err = io.CopyN(stdout, reader, readsize)
302                 } else {
303                         // stderr
304                         _, err = io.CopyN(stderr, reader, readsize)
305                 }
306         }
307         if err != nil {
308                 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
309         }
310         return nil
311 }
312
313 func (e *dockerExecutor) Close() {
314         e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
315 }
316
317 func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
318         cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
319         if usingTTY {
320                 cmd.Args = append(cmd.Args, "-t")
321         }
322         cmd.Args = append(cmd.Args, e.containerID)
323         cmd.Args = append(cmd.Args, injectcmd...)
324         return cmd, nil
325 }
326
327 func (e *dockerExecutor) IPAddress() (string, error) {
328         if ip, ok := e.savedIPAddress.Load().(*string); ok {
329                 return *ip, nil
330         }
331         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
332         defer cancel()
333         ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
334         if err != nil {
335                 return "", fmt.Errorf("cannot get docker container info: %s", err)
336         }
337         ip := ctr.NetworkSettings.IPAddress
338         if ip == "" {
339                 // TODO: try to enable networking if it wasn't
340                 // already enabled when the container was
341                 // created.
342                 return "", fmt.Errorf("container has no IP address")
343         }
344         e.savedIPAddress.Store(&ip)
345         return ip, nil
346 }