12630: Call nvidia-modprobe, support CUDA_VISIBLE_DEVICES
[arvados.git] / lib / crunchrun / docker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4 package crunchrun
5
6 import (
7         "fmt"
8         "io"
9         "io/ioutil"
10         "os"
11         "strings"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         dockertypes "github.com/docker/docker/api/types"
16         dockercontainer "github.com/docker/docker/api/types/container"
17         dockerclient "github.com/docker/docker/client"
18         "golang.org/x/net/context"
19 )
20
21 // Docker daemon won't let you set a limit less than ~10 MiB
22 const minDockerRAM = int64(16 * 1024 * 1024)
23
24 type dockerExecutor struct {
25         containerUUID    string
26         logf             func(string, ...interface{})
27         watchdogInterval time.Duration
28         dockerclient     *dockerclient.Client
29         containerID      string
30         doneIO           chan struct{}
31         errIO            error
32 }
33
34 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
35         // API version 1.21 corresponds to Docker 1.9, which is
36         // currently the minimum version we want to support.
37         client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
38         if watchdogInterval < 1 {
39                 watchdogInterval = time.Minute
40         }
41         return &dockerExecutor{
42                 containerUUID:    containerUUID,
43                 logf:             logf,
44                 watchdogInterval: watchdogInterval,
45                 dockerclient:     client,
46         }, err
47 }
48
49 func (e *dockerExecutor) Runtime() string { return "docker" }
50
51 func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
52         containerClient *arvados.Client) error {
53         _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
54         if err == nil {
55                 // already loaded
56                 return nil
57         }
58
59         f, err := os.Open(imageTarballPath)
60         if err != nil {
61                 return err
62         }
63         defer f.Close()
64         resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
65         if err != nil {
66                 return fmt.Errorf("While loading container image into Docker: %v", err)
67         }
68         defer resp.Body.Close()
69         buf, _ := ioutil.ReadAll(resp.Body)
70         e.logf("loaded image: response %s", buf)
71         return nil
72 }
73
74 func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
75         e.logf("Creating Docker container")
76         cfg := dockercontainer.Config{
77                 Image:        spec.Image,
78                 Cmd:          spec.Command,
79                 WorkingDir:   spec.WorkingDir,
80                 Volumes:      map[string]struct{}{},
81                 OpenStdin:    spec.Stdin != nil,
82                 StdinOnce:    spec.Stdin != nil,
83                 AttachStdin:  spec.Stdin != nil,
84                 AttachStdout: true,
85                 AttachStderr: true,
86         }
87         if cfg.WorkingDir == "." {
88                 cfg.WorkingDir = ""
89         }
90         for k, v := range spec.Env {
91                 cfg.Env = append(cfg.Env, k+"="+v)
92         }
93         if spec.RAM > 0 && spec.RAM < minDockerRAM {
94                 spec.RAM = minDockerRAM
95         }
96         hostCfg := dockercontainer.HostConfig{
97                 LogConfig: dockercontainer.LogConfig{
98                         Type: "none",
99                 },
100                 NetworkMode: dockercontainer.NetworkMode("none"),
101                 Resources: dockercontainer.Resources{
102                         CgroupParent: spec.CgroupParent,
103                         NanoCPUs:     int64(spec.VCPUs) * 1000000000,
104                         Memory:       spec.RAM, // RAM
105                         MemorySwap:   spec.RAM, // RAM+swap
106                         KernelMemory: spec.RAM, // kernel portion
107                 },
108         }
109         if spec.CUDADeviceCount != 0 {
110                 var deviceIds []string
111                 for _, s := range os.Environ() {
112                         // If a resource manager such as slurm or LSF told
113                         // us to select specific devices we need to propagate that.
114                         if strings.HasPrefix(s, "CUDA_VISIBLE_DEVICES=") {
115                                 deviceIds = strings.SplitN(strings.SplitN(s, "=", 2)[1], ",")
116                         }
117                 }
118                 deviceCount := spec.CUDADeviceCount
119                 if len(deviceIds) > 0 {
120                         // Docker won't accept both non-empty
121                         // DeviceIDs and a non-zero Count
122                         deviceCount = 0
123                 }
124
125                 hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
126                         Driver:       "nvidia",
127                         Count:        deviceCount,
128                         DeviceIDs:    deviceIds,
129                         Capabilities: [][]string{[]string{"gpu", "nvidia"}},
130                 })
131         }
132         for path, mount := range spec.BindMounts {
133                 bind := mount.HostPath + ":" + path
134                 if mount.ReadOnly {
135                         bind += ":ro"
136                 }
137                 hostCfg.Binds = append(hostCfg.Binds, bind)
138         }
139         if spec.EnableNetwork {
140                 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
141         }
142         return cfg, hostCfg
143 }
144
145 func (e *dockerExecutor) Create(spec containerSpec) error {
146         cfg, hostCfg := e.config(spec)
147         created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, e.containerUUID)
148         if err != nil {
149                 return fmt.Errorf("While creating container: %v", err)
150         }
151         e.containerID = created.ID
152         return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
153 }
154
155 func (e *dockerExecutor) CgroupID() string {
156         return e.containerID
157 }
158
159 func (e *dockerExecutor) Start() error {
160         return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockertypes.ContainerStartOptions{})
161 }
162
163 func (e *dockerExecutor) Stop() error {
164         err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
165         if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
166                 err = nil
167         }
168         return err
169 }
170
171 // Wait for the container to terminate, capture the exit code, and
172 // wait for stdout/stderr logging to finish.
173 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
174         ctx, cancel := context.WithCancel(ctx)
175         defer cancel()
176         watchdogErr := make(chan error, 1)
177         go func() {
178                 ticker := time.NewTicker(e.watchdogInterval)
179                 defer ticker.Stop()
180                 for range ticker.C {
181                         dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
182                         ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
183                         dcancel()
184                         if ctx.Err() != nil {
185                                 // Either the container already
186                                 // exited, or our caller is trying to
187                                 // kill it.
188                                 return
189                         } else if err != nil {
190                                 e.logf("Error inspecting container: %s", err)
191                                 watchdogErr <- err
192                                 return
193                         } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
194                                 watchdogErr <- fmt.Errorf("Container is not running: State=%v", ctr.State)
195                                 return
196                         }
197                 }
198         }()
199
200         waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
201         for {
202                 select {
203                 case waitBody := <-waitOk:
204                         e.logf("Container exited with code: %v", waitBody.StatusCode)
205                         // wait for stdout/stderr to complete
206                         <-e.doneIO
207                         return int(waitBody.StatusCode), nil
208
209                 case err := <-waitErr:
210                         return -1, fmt.Errorf("container wait: %v", err)
211
212                 case <-ctx.Done():
213                         return -1, ctx.Err()
214
215                 case err := <-watchdogErr:
216                         return -1, err
217                 }
218         }
219 }
220
221 func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
222         resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
223                 Stream: true,
224                 Stdin:  stdin != nil,
225                 Stdout: true,
226                 Stderr: true,
227         })
228         if err != nil {
229                 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
230         }
231         var errStdin error
232         if stdin != nil {
233                 go func() {
234                         errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
235                 }()
236         }
237         e.doneIO = make(chan struct{})
238         go func() {
239                 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
240                 if e.errIO == nil && errStdin != nil {
241                         e.errIO = errStdin
242                 }
243                 close(e.doneIO)
244         }()
245         return nil
246 }
247
248 func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
249         defer closeConn()
250         _, err := io.Copy(conn, stdin)
251         if err != nil {
252                 return fmt.Errorf("While writing to docker container on stdin: %v", err)
253         }
254         return nil
255 }
256
257 // Handle docker log protocol; see
258 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
259 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
260         header := make([]byte, 8)
261         var err error
262         for err == nil {
263                 _, err = io.ReadAtLeast(reader, header, 8)
264                 if err != nil {
265                         if err == io.EOF {
266                                 err = nil
267                         }
268                         break
269                 }
270                 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
271                 if header[0] == 1 {
272                         _, err = io.CopyN(stdout, reader, readsize)
273                 } else {
274                         // stderr
275                         _, err = io.CopyN(stderr, reader, readsize)
276                 }
277         }
278         if err != nil {
279                 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
280         }
281         return nil
282 }
283
284 func (e *dockerExecutor) Close() {
285         e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
286 }