1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 dockertypes "github.com/docker/docker/api/types"
16 dockercontainer "github.com/docker/docker/api/types/container"
17 dockerclient "github.com/docker/docker/client"
18 "golang.org/x/net/context"
21 // Docker daemon won't let you set a limit less than ~10 MiB
22 const minDockerRAM = int64(16 * 1024 * 1024)
24 type dockerExecutor struct {
26 logf func(string, ...interface{})
27 watchdogInterval time.Duration
28 dockerclient *dockerclient.Client
34 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
35 // API version 1.21 corresponds to Docker 1.9, which is
36 // currently the minimum version we want to support.
37 client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
38 if watchdogInterval < 1 {
39 watchdogInterval = time.Minute
41 return &dockerExecutor{
42 containerUUID: containerUUID,
44 watchdogInterval: watchdogInterval,
49 func (e *dockerExecutor) Runtime() string { return "docker" }
51 func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
52 containerClient *arvados.Client) error {
53 _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
59 f, err := os.Open(imageTarballPath)
64 resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
66 return fmt.Errorf("While loading container image into Docker: %v", err)
68 defer resp.Body.Close()
69 buf, _ := ioutil.ReadAll(resp.Body)
70 e.logf("loaded image: response %s", buf)
74 func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
75 e.logf("Creating Docker container")
76 cfg := dockercontainer.Config{
79 WorkingDir: spec.WorkingDir,
80 Volumes: map[string]struct{}{},
81 OpenStdin: spec.Stdin != nil,
82 StdinOnce: spec.Stdin != nil,
83 AttachStdin: spec.Stdin != nil,
87 if cfg.WorkingDir == "." {
90 for k, v := range spec.Env {
91 cfg.Env = append(cfg.Env, k+"="+v)
93 if spec.RAM > 0 && spec.RAM < minDockerRAM {
94 spec.RAM = minDockerRAM
96 hostCfg := dockercontainer.HostConfig{
97 LogConfig: dockercontainer.LogConfig{
100 NetworkMode: dockercontainer.NetworkMode("none"),
101 Resources: dockercontainer.Resources{
102 CgroupParent: spec.CgroupParent,
103 NanoCPUs: int64(spec.VCPUs) * 1000000000,
104 Memory: spec.RAM, // RAM
105 MemorySwap: spec.RAM, // RAM+swap
106 KernelMemory: spec.RAM, // kernel portion
109 if spec.CUDADeviceCount != 0 {
110 var deviceIds []string
111 if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" {
112 // If a resource manager such as slurm or LSF told
113 // us to select specific devices we need to propagate that.
114 deviceIds = strings.Split(cudaVisibleDevices, ",")
117 deviceCount := spec.CUDADeviceCount
118 if len(deviceIds) > 0 {
119 // Docker won't accept both non-empty
120 // DeviceIDs and a non-zero Count
122 // (it turns out "Count" is a dumb fallback
123 // that just allocates device 0, 1, 2, ...,
128 // Capabilities are confusing. The driver has generic
129 // capabilities "gpu" and "nvidia" but then there's
130 // additional capabilities "compute" and "utility"
131 // that are passed to nvidia-container-cli.
133 // "compute" means include the CUDA libraries and
134 // "utility" means include the CUDA utility programs
135 // (like nvidia-smi).
137 // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37
138 // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go
139 hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
142 DeviceIDs: deviceIds,
143 Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}},
146 for path, mount := range spec.BindMounts {
147 bind := mount.HostPath + ":" + path
151 hostCfg.Binds = append(hostCfg.Binds, bind)
153 if spec.EnableNetwork {
154 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
159 func (e *dockerExecutor) Create(spec containerSpec) error {
160 cfg, hostCfg := e.config(spec)
161 created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, e.containerUUID)
163 return fmt.Errorf("While creating container: %v", err)
165 e.containerID = created.ID
166 return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
169 func (e *dockerExecutor) CgroupID() string {
173 func (e *dockerExecutor) Start() error {
174 return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockertypes.ContainerStartOptions{})
177 func (e *dockerExecutor) Stop() error {
178 err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
179 if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
185 // Wait for the container to terminate, capture the exit code, and
186 // wait for stdout/stderr logging to finish.
187 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
188 ctx, cancel := context.WithCancel(ctx)
190 watchdogErr := make(chan error, 1)
192 ticker := time.NewTicker(e.watchdogInterval)
195 dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
196 ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
198 if ctx.Err() != nil {
199 // Either the container already
200 // exited, or our caller is trying to
203 } else if err != nil {
204 e.logf("Error inspecting container: %s", err)
207 } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
208 watchdogErr <- fmt.Errorf("Container is not running: State=%v", ctr.State)
214 waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
217 case waitBody := <-waitOk:
218 // wait for stdout/stderr to complete
220 return int(waitBody.StatusCode), nil
222 case err := <-waitErr:
223 return -1, fmt.Errorf("container wait: %v", err)
228 case err := <-watchdogErr:
234 func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
235 resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
242 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
247 errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
250 e.doneIO = make(chan struct{})
252 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
253 if e.errIO == nil && errStdin != nil {
261 func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
263 _, err := io.Copy(conn, stdin)
265 return fmt.Errorf("While writing to docker container on stdin: %v", err)
270 // Handle docker log protocol; see
271 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
272 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
273 header := make([]byte, 8)
276 _, err = io.ReadAtLeast(reader, header, 8)
283 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
285 _, err = io.CopyN(stdout, reader, readsize)
288 _, err = io.CopyN(stderr, reader, readsize)
292 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
297 func (e *dockerExecutor) Close() {
298 e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})