19094: Note docker/singularity/arv-mount versions in container log.
[arvados.git] / lib / crunchrun / docker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4 package crunchrun
5
6 import (
7         "fmt"
8         "io"
9         "io/ioutil"
10         "os"
11         "strings"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         dockertypes "github.com/docker/docker/api/types"
16         dockercontainer "github.com/docker/docker/api/types/container"
17         dockerclient "github.com/docker/docker/client"
18         "golang.org/x/net/context"
19 )
20
21 // Docker daemon won't let you set a limit less than ~10 MiB
22 const minDockerRAM = int64(16 * 1024 * 1024)
23
24 type dockerExecutor struct {
25         containerUUID    string
26         logf             func(string, ...interface{})
27         watchdogInterval time.Duration
28         dockerclient     *dockerclient.Client
29         containerID      string
30         doneIO           chan struct{}
31         errIO            error
32 }
33
34 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
35         // API version 1.21 corresponds to Docker 1.9, which is
36         // currently the minimum version we want to support.
37         client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
38         if watchdogInterval < 1 {
39                 watchdogInterval = time.Minute
40         }
41         return &dockerExecutor{
42                 containerUUID:    containerUUID,
43                 logf:             logf,
44                 watchdogInterval: watchdogInterval,
45                 dockerclient:     client,
46         }, err
47 }
48
49 func (e *dockerExecutor) Runtime() string {
50         v, _ := e.dockerclient.ServerVersion(context.Background())
51         info := ""
52         for _, cv := range v.Components {
53                 if info != "" {
54                         info += ", "
55                 }
56                 info += cv.Name + " " + cv.Version
57         }
58         if info == "" {
59                 info = "(unknown version)"
60         }
61         return "docker " + info
62 }
63
64 func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
65         containerClient *arvados.Client) error {
66         _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
67         if err == nil {
68                 // already loaded
69                 return nil
70         }
71
72         f, err := os.Open(imageTarballPath)
73         if err != nil {
74                 return err
75         }
76         defer f.Close()
77         resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
78         if err != nil {
79                 return fmt.Errorf("While loading container image into Docker: %v", err)
80         }
81         defer resp.Body.Close()
82         buf, _ := ioutil.ReadAll(resp.Body)
83         e.logf("loaded image: response %s", buf)
84         return nil
85 }
86
87 func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
88         e.logf("Creating Docker container")
89         cfg := dockercontainer.Config{
90                 Image:        spec.Image,
91                 Cmd:          spec.Command,
92                 WorkingDir:   spec.WorkingDir,
93                 Volumes:      map[string]struct{}{},
94                 OpenStdin:    spec.Stdin != nil,
95                 StdinOnce:    spec.Stdin != nil,
96                 AttachStdin:  spec.Stdin != nil,
97                 AttachStdout: true,
98                 AttachStderr: true,
99         }
100         if cfg.WorkingDir == "." {
101                 cfg.WorkingDir = ""
102         }
103         for k, v := range spec.Env {
104                 cfg.Env = append(cfg.Env, k+"="+v)
105         }
106         if spec.RAM > 0 && spec.RAM < minDockerRAM {
107                 spec.RAM = minDockerRAM
108         }
109         hostCfg := dockercontainer.HostConfig{
110                 LogConfig: dockercontainer.LogConfig{
111                         Type: "none",
112                 },
113                 NetworkMode: dockercontainer.NetworkMode("none"),
114                 Resources: dockercontainer.Resources{
115                         CgroupParent: spec.CgroupParent,
116                         NanoCPUs:     int64(spec.VCPUs) * 1000000000,
117                         Memory:       spec.RAM, // RAM
118                         MemorySwap:   spec.RAM, // RAM+swap
119                         KernelMemory: spec.RAM, // kernel portion
120                 },
121         }
122         if spec.CUDADeviceCount != 0 {
123                 var deviceIds []string
124                 if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" {
125                         // If a resource manager such as slurm or LSF told
126                         // us to select specific devices we need to propagate that.
127                         deviceIds = strings.Split(cudaVisibleDevices, ",")
128                 }
129
130                 deviceCount := spec.CUDADeviceCount
131                 if len(deviceIds) > 0 {
132                         // Docker won't accept both non-empty
133                         // DeviceIDs and a non-zero Count
134                         //
135                         // (it turns out "Count" is a dumb fallback
136                         // that just allocates device 0, 1, 2, ...,
137                         // Count-1)
138                         deviceCount = 0
139                 }
140
141                 // Capabilities are confusing.  The driver has generic
142                 // capabilities "gpu" and "nvidia" but then there's
143                 // additional capabilities "compute" and "utility"
144                 // that are passed to nvidia-container-cli.
145                 //
146                 // "compute" means include the CUDA libraries and
147                 // "utility" means include the CUDA utility programs
148                 // (like nvidia-smi).
149                 //
150                 // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37
151                 // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go
152                 hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
153                         Driver:       "nvidia",
154                         Count:        deviceCount,
155                         DeviceIDs:    deviceIds,
156                         Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}},
157                 })
158         }
159         for path, mount := range spec.BindMounts {
160                 bind := mount.HostPath + ":" + path
161                 if mount.ReadOnly {
162                         bind += ":ro"
163                 }
164                 hostCfg.Binds = append(hostCfg.Binds, bind)
165         }
166         if spec.EnableNetwork {
167                 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
168         }
169         return cfg, hostCfg
170 }
171
172 func (e *dockerExecutor) Create(spec containerSpec) error {
173         cfg, hostCfg := e.config(spec)
174         created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, e.containerUUID)
175         if err != nil {
176                 return fmt.Errorf("While creating container: %v", err)
177         }
178         e.containerID = created.ID
179         return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
180 }
181
182 func (e *dockerExecutor) CgroupID() string {
183         return e.containerID
184 }
185
186 func (e *dockerExecutor) Start() error {
187         return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockertypes.ContainerStartOptions{})
188 }
189
190 func (e *dockerExecutor) Stop() error {
191         err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
192         if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
193                 err = nil
194         }
195         return err
196 }
197
198 // Wait for the container to terminate, capture the exit code, and
199 // wait for stdout/stderr logging to finish.
200 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
201         ctx, cancel := context.WithCancel(ctx)
202         defer cancel()
203         watchdogErr := make(chan error, 1)
204         go func() {
205                 ticker := time.NewTicker(e.watchdogInterval)
206                 defer ticker.Stop()
207                 for range ticker.C {
208                         dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
209                         ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
210                         dcancel()
211                         if ctx.Err() != nil {
212                                 // Either the container already
213                                 // exited, or our caller is trying to
214                                 // kill it.
215                                 return
216                         } else if err != nil {
217                                 e.logf("Error inspecting container: %s", err)
218                                 watchdogErr <- err
219                                 return
220                         } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
221                                 watchdogErr <- fmt.Errorf("Container is not running: State=%v", ctr.State)
222                                 return
223                         }
224                 }
225         }()
226
227         waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
228         for {
229                 select {
230                 case waitBody := <-waitOk:
231                         // wait for stdout/stderr to complete
232                         <-e.doneIO
233                         return int(waitBody.StatusCode), nil
234
235                 case err := <-waitErr:
236                         return -1, fmt.Errorf("container wait: %v", err)
237
238                 case <-ctx.Done():
239                         return -1, ctx.Err()
240
241                 case err := <-watchdogErr:
242                         return -1, err
243                 }
244         }
245 }
246
247 func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
248         resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
249                 Stream: true,
250                 Stdin:  stdin != nil,
251                 Stdout: true,
252                 Stderr: true,
253         })
254         if err != nil {
255                 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
256         }
257         var errStdin error
258         if stdin != nil {
259                 go func() {
260                         errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
261                 }()
262         }
263         e.doneIO = make(chan struct{})
264         go func() {
265                 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
266                 if e.errIO == nil && errStdin != nil {
267                         e.errIO = errStdin
268                 }
269                 close(e.doneIO)
270         }()
271         return nil
272 }
273
274 func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
275         defer closeConn()
276         _, err := io.Copy(conn, stdin)
277         if err != nil {
278                 return fmt.Errorf("While writing to docker container on stdin: %v", err)
279         }
280         return nil
281 }
282
283 // Handle docker log protocol; see
284 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
285 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
286         header := make([]byte, 8)
287         var err error
288         for err == nil {
289                 _, err = io.ReadAtLeast(reader, header, 8)
290                 if err != nil {
291                         if err == io.EOF {
292                                 err = nil
293                         }
294                         break
295                 }
296                 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
297                 if header[0] == 1 {
298                         _, err = io.CopyN(stdout, reader, readsize)
299                 } else {
300                         // stderr
301                         _, err = io.CopyN(stderr, reader, readsize)
302                 }
303         }
304         if err != nil {
305                 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
306         }
307         return nil
308 }
309
310 func (e *dockerExecutor) Close() {
311         e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
312 }