1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 dockertypes "github.com/docker/docker/api/types"
15 dockercontainer "github.com/docker/docker/api/types/container"
16 dockerclient "github.com/docker/docker/client"
17 "golang.org/x/net/context"
20 // Docker daemon won't let you set a limit less than ~10 MiB
21 const minDockerRAM = int64(16 * 1024 * 1024)
23 type dockerExecutor struct {
25 logf func(string, ...interface{})
26 watchdogInterval time.Duration
27 dockerclient *dockerclient.Client
33 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
34 // API version 1.21 corresponds to Docker 1.9, which is
35 // currently the minimum version we want to support.
36 client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
37 if watchdogInterval < 1 {
38 watchdogInterval = time.Minute
40 return &dockerExecutor{
41 containerUUID: containerUUID,
43 watchdogInterval: watchdogInterval,
48 func (e *dockerExecutor) ImageLoaded(imageID string) bool {
49 _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
53 func (e *dockerExecutor) LoadImage(filename string) error {
54 f, err := os.Open(filename)
59 resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
61 return fmt.Errorf("While loading container image into Docker: %v", err)
63 defer resp.Body.Close()
64 buf, _ := ioutil.ReadAll(resp.Body)
65 e.logf("loaded image: response %s", buf)
69 func (e *dockerExecutor) Create(spec containerSpec) error {
70 e.logf("Creating Docker container")
71 cfg := dockercontainer.Config{
74 WorkingDir: spec.WorkingDir,
75 Volumes: map[string]struct{}{},
76 OpenStdin: spec.Stdin != nil,
77 StdinOnce: spec.Stdin != nil,
78 AttachStdin: spec.Stdin != nil,
82 if cfg.WorkingDir == "." {
85 for k, v := range spec.Env {
86 cfg.Env = append(cfg.Env, k+"="+v)
88 if spec.RAM < minDockerRAM {
89 spec.RAM = minDockerRAM
91 hostCfg := dockercontainer.HostConfig{
92 LogConfig: dockercontainer.LogConfig{
95 NetworkMode: dockercontainer.NetworkMode("none"),
96 Resources: dockercontainer.Resources{
97 CgroupParent: spec.CgroupParent,
98 NanoCPUs: int64(spec.VCPUs) * 1000000000,
99 Memory: spec.RAM, // RAM
100 MemorySwap: spec.RAM, // RAM+swap
101 KernelMemory: spec.RAM, // kernel portion
104 for path, mount := range spec.BindMounts {
105 bind := mount.HostPath + ":" + path
109 hostCfg.Binds = append(hostCfg.Binds, bind)
111 if spec.EnableNetwork {
112 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
115 created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, e.containerUUID)
117 return fmt.Errorf("While creating container: %v", err)
119 e.containerID = created.ID
120 return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
123 func (e *dockerExecutor) CgroupID() string {
127 func (e *dockerExecutor) Start() error {
128 return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockertypes.ContainerStartOptions{})
131 func (e *dockerExecutor) Stop() error {
132 err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
133 if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
139 // Wait for the container to terminate, capture the exit code, and
140 // wait for stdout/stderr logging to finish.
141 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
142 ctx, cancel := context.WithCancel(ctx)
144 watchdogErr := make(chan error, 1)
146 ticker := time.NewTicker(e.watchdogInterval)
149 dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
150 ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
152 if ctx.Err() != nil {
153 // Either the container already
154 // exited, or our caller is trying to
157 } else if err != nil {
158 e.logf("Error inspecting container: %s", err)
161 } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
162 watchdogErr <- fmt.Errorf("Container is not running: State=%v", ctr.State)
168 waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
171 case waitBody := <-waitOk:
172 e.logf("Container exited with code: %v", waitBody.StatusCode)
173 // wait for stdout/stderr to complete
175 return int(waitBody.StatusCode), nil
177 case err := <-waitErr:
178 return -1, fmt.Errorf("container wait: %v", err)
183 case err := <-watchdogErr:
189 func (e *dockerExecutor) startIO(stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
190 resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
197 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
202 errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
205 e.doneIO = make(chan struct{})
207 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
208 if e.errIO == nil && errStdin != nil {
216 func (e *dockerExecutor) handleStdin(stdin io.ReadCloser, conn io.Writer, closeConn func() error) error {
219 _, err := io.Copy(conn, stdin)
221 return fmt.Errorf("While writing to docker container on stdin: %v", err)
226 // Handle docker log protocol; see
227 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
228 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.WriteCloser, reader io.Reader) error {
229 header := make([]byte, 8)
232 _, err = io.ReadAtLeast(reader, header, 8)
239 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
241 _, err = io.CopyN(stdout, reader, readsize)
244 _, err = io.CopyN(stderr, reader, readsize)
248 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
252 return fmt.Errorf("error writing stdout: close: %v", err)
256 return fmt.Errorf("error writing stderr: close: %v", err)
261 func (e *dockerExecutor) Close() {
262 e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})