Merge branch '17394-collectionfs-storage-class' into main
[arvados.git] / lib / crunchrun / docker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4 package crunchrun
5
6 import (
7         "fmt"
8         "io"
9         "io/ioutil"
10         "os"
11         "strings"
12         "time"
13
14         dockertypes "github.com/docker/docker/api/types"
15         dockercontainer "github.com/docker/docker/api/types/container"
16         dockerclient "github.com/docker/docker/client"
17         "golang.org/x/net/context"
18 )
19
20 // Docker daemon won't let you set a limit less than ~10 MiB
21 const minDockerRAM = int64(16 * 1024 * 1024)
22
23 type dockerExecutor struct {
24         containerUUID    string
25         logf             func(string, ...interface{})
26         watchdogInterval time.Duration
27         dockerclient     *dockerclient.Client
28         containerID      string
29         doneIO           chan struct{}
30         errIO            error
31 }
32
33 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
34         // API version 1.21 corresponds to Docker 1.9, which is
35         // currently the minimum version we want to support.
36         client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
37         if watchdogInterval < 1 {
38                 watchdogInterval = time.Minute
39         }
40         return &dockerExecutor{
41                 containerUUID:    containerUUID,
42                 logf:             logf,
43                 watchdogInterval: watchdogInterval,
44                 dockerclient:     client,
45         }, err
46 }
47
48 func (e *dockerExecutor) ImageLoaded(imageID string) bool {
49         _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
50         return err == nil
51 }
52
53 func (e *dockerExecutor) LoadImage(filename string) error {
54         f, err := os.Open(filename)
55         if err != nil {
56                 return err
57         }
58         defer f.Close()
59         resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
60         if err != nil {
61                 return fmt.Errorf("While loading container image into Docker: %v", err)
62         }
63         defer resp.Body.Close()
64         buf, _ := ioutil.ReadAll(resp.Body)
65         e.logf("loaded image: response %s", buf)
66         return nil
67 }
68
69 func (e *dockerExecutor) Create(spec containerSpec) error {
70         e.logf("Creating Docker container")
71         cfg := dockercontainer.Config{
72                 Image:        spec.Image,
73                 Cmd:          spec.Command,
74                 WorkingDir:   spec.WorkingDir,
75                 Volumes:      map[string]struct{}{},
76                 OpenStdin:    spec.Stdin != nil,
77                 StdinOnce:    spec.Stdin != nil,
78                 AttachStdin:  spec.Stdin != nil,
79                 AttachStdout: true,
80                 AttachStderr: true,
81         }
82         if cfg.WorkingDir == "." {
83                 cfg.WorkingDir = ""
84         }
85         for k, v := range spec.Env {
86                 cfg.Env = append(cfg.Env, k+"="+v)
87         }
88         if spec.RAM > 0 && spec.RAM < minDockerRAM {
89                 spec.RAM = minDockerRAM
90         }
91         hostCfg := dockercontainer.HostConfig{
92                 LogConfig: dockercontainer.LogConfig{
93                         Type: "none",
94                 },
95                 NetworkMode: dockercontainer.NetworkMode("none"),
96                 Resources: dockercontainer.Resources{
97                         CgroupParent: spec.CgroupParent,
98                         NanoCPUs:     int64(spec.VCPUs) * 1000000000,
99                         Memory:       spec.RAM, // RAM
100                         MemorySwap:   spec.RAM, // RAM+swap
101                         KernelMemory: spec.RAM, // kernel portion
102                 },
103         }
104         for path, mount := range spec.BindMounts {
105                 bind := mount.HostPath + ":" + path
106                 if mount.ReadOnly {
107                         bind += ":ro"
108                 }
109                 hostCfg.Binds = append(hostCfg.Binds, bind)
110         }
111         if spec.EnableNetwork {
112                 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
113         }
114
115         created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, e.containerUUID)
116         if err != nil {
117                 return fmt.Errorf("While creating container: %v", err)
118         }
119         e.containerID = created.ID
120         return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
121 }
122
123 func (e *dockerExecutor) CgroupID() string {
124         return e.containerID
125 }
126
127 func (e *dockerExecutor) Start() error {
128         return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockertypes.ContainerStartOptions{})
129 }
130
131 func (e *dockerExecutor) Stop() error {
132         err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
133         if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
134                 err = nil
135         }
136         return err
137 }
138
139 // Wait for the container to terminate, capture the exit code, and
140 // wait for stdout/stderr logging to finish.
141 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
142         ctx, cancel := context.WithCancel(ctx)
143         defer cancel()
144         watchdogErr := make(chan error, 1)
145         go func() {
146                 ticker := time.NewTicker(e.watchdogInterval)
147                 defer ticker.Stop()
148                 for range ticker.C {
149                         dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
150                         ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
151                         dcancel()
152                         if ctx.Err() != nil {
153                                 // Either the container already
154                                 // exited, or our caller is trying to
155                                 // kill it.
156                                 return
157                         } else if err != nil {
158                                 e.logf("Error inspecting container: %s", err)
159                                 watchdogErr <- err
160                                 return
161                         } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
162                                 watchdogErr <- fmt.Errorf("Container is not running: State=%v", ctr.State)
163                                 return
164                         }
165                 }
166         }()
167
168         waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
169         for {
170                 select {
171                 case waitBody := <-waitOk:
172                         e.logf("Container exited with code: %v", waitBody.StatusCode)
173                         // wait for stdout/stderr to complete
174                         <-e.doneIO
175                         return int(waitBody.StatusCode), nil
176
177                 case err := <-waitErr:
178                         return -1, fmt.Errorf("container wait: %v", err)
179
180                 case <-ctx.Done():
181                         return -1, ctx.Err()
182
183                 case err := <-watchdogErr:
184                         return -1, err
185                 }
186         }
187 }
188
189 func (e *dockerExecutor) startIO(stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
190         resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
191                 Stream: true,
192                 Stdin:  stdin != nil,
193                 Stdout: true,
194                 Stderr: true,
195         })
196         if err != nil {
197                 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
198         }
199         var errStdin error
200         if stdin != nil {
201                 go func() {
202                         errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
203                 }()
204         }
205         e.doneIO = make(chan struct{})
206         go func() {
207                 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
208                 if e.errIO == nil && errStdin != nil {
209                         e.errIO = errStdin
210                 }
211                 close(e.doneIO)
212         }()
213         return nil
214 }
215
216 func (e *dockerExecutor) handleStdin(stdin io.ReadCloser, conn io.Writer, closeConn func() error) error {
217         defer stdin.Close()
218         defer closeConn()
219         _, err := io.Copy(conn, stdin)
220         if err != nil {
221                 return fmt.Errorf("While writing to docker container on stdin: %v", err)
222         }
223         return nil
224 }
225
226 // Handle docker log protocol; see
227 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
228 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.WriteCloser, reader io.Reader) error {
229         header := make([]byte, 8)
230         var err error
231         for err == nil {
232                 _, err = io.ReadAtLeast(reader, header, 8)
233                 if err != nil {
234                         if err == io.EOF {
235                                 err = nil
236                         }
237                         break
238                 }
239                 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
240                 if header[0] == 1 {
241                         _, err = io.CopyN(stdout, reader, readsize)
242                 } else {
243                         // stderr
244                         _, err = io.CopyN(stderr, reader, readsize)
245                 }
246         }
247         if err != nil {
248                 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
249         }
250         err = stdout.Close()
251         if err != nil {
252                 return fmt.Errorf("error writing stdout: close: %v", err)
253         }
254         err = stderr.Close()
255         if err != nil {
256                 return fmt.Errorf("error writing stderr: close: %v", err)
257         }
258         return nil
259 }
260
261 func (e *dockerExecutor) Close() {
262         e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
263 }