1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 dockercontainer "github.com/docker/docker/api/types/container"
22 dockerclient "github.com/docker/docker/client"
23 "github.com/docker/docker/pkg/jsonmessage"
26 // Docker daemon won't let you set a limit less than ~10 MiB
27 const minDockerRAM = int64(16 * 1024 * 1024)
29 // DockerAPIVersion is the API version we use to communicate with the
30 // docker service. The oldest OS we support is Ubuntu 18.04 (bionic)
31 // which originally shipped docker 1.17.12 / API 1.35 so there is no
32 // reason to use an older API version. See
33 // https://dev.arvados.org/issues/15370#note-38 and
34 // https://docs.docker.com/engine/api/.
35 const DockerAPIVersion = "1.35"
37 // Number of consecutive "inspect container" failures before
38 // concluding Docker is unresponsive, giving up, and cancelling the
40 const dockerWatchdogThreshold = 5
42 type dockerExecutor struct {
44 logf func(string, ...interface{})
45 watchdogInterval time.Duration
46 dockerclient *dockerclient.Client
48 savedIPAddress atomic.Value
53 func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
54 // API version 1.21 corresponds to Docker 1.9, which is
55 // currently the minimum version we want to support.
56 client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, DockerAPIVersion, nil, nil)
57 if watchdogInterval < 1 {
58 watchdogInterval = time.Minute * 2
60 return &dockerExecutor{
61 containerUUID: containerUUID,
63 watchdogInterval: watchdogInterval,
68 func (e *dockerExecutor) Runtime() string {
69 v, _ := e.dockerclient.ServerVersion(context.Background())
71 for _, cv := range v.Components {
75 info += cv.Name + " " + cv.Version
78 info = "(unknown version)"
80 return "docker " + info
83 func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
84 containerClient *arvados.Client) error {
85 _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
91 f, err := os.Open(imageTarballPath)
96 resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
98 return fmt.Errorf("ImageLoad: %w", err)
100 defer resp.Body.Close()
101 var message jsonmessage.JSONMessage
102 err = json.NewDecoder(resp.Body).Decode(&message)
104 return fmt.Errorf("could not parse Docker response: %w", err)
106 if message.Error != nil {
107 return fmt.Errorf("ImageLoad: %w", message.Error)
109 // message.Stream is typically "Loaded image: hello-world:latest\n"
110 e.logf("%s", strings.TrimSuffix(message.Stream, "\n"))
114 func (e *dockerExecutor) config(spec containerSpec) (dockercontainer.Config, dockercontainer.HostConfig) {
115 e.logf("Creating Docker container")
116 cfg := dockercontainer.Config{
119 WorkingDir: spec.WorkingDir,
120 Volumes: map[string]struct{}{},
121 OpenStdin: spec.Stdin != nil,
122 StdinOnce: spec.Stdin != nil,
123 AttachStdin: spec.Stdin != nil,
127 if cfg.WorkingDir == "." {
130 for k, v := range spec.Env {
131 cfg.Env = append(cfg.Env, k+"="+v)
133 if spec.RAM > 0 && spec.RAM < minDockerRAM {
134 spec.RAM = minDockerRAM
136 hostCfg := dockercontainer.HostConfig{
137 LogConfig: dockercontainer.LogConfig{
140 NetworkMode: dockercontainer.NetworkMode("none"),
141 Resources: dockercontainer.Resources{
142 CgroupParent: spec.CgroupParent,
143 NanoCPUs: int64(spec.VCPUs) * 1000000000,
144 Memory: spec.RAM, // RAM
145 MemorySwap: spec.RAM, // RAM+swap
146 KernelMemory: spec.RAM, // kernel portion
149 if spec.GPUStack == "cuda" && spec.GPUDeviceCount > 0 {
150 var deviceIds []string
151 if cudaVisibleDevices := os.Getenv("CUDA_VISIBLE_DEVICES"); cudaVisibleDevices != "" {
152 // If a resource manager such as slurm or LSF told
153 // us to select specific devices we need to propagate that.
154 deviceIds = strings.Split(cudaVisibleDevices, ",")
157 deviceCount := spec.GPUDeviceCount
158 if len(deviceIds) > 0 {
159 // Docker won't accept both non-empty
160 // DeviceIDs and a non-zero Count
162 // (it turns out "Count" is a dumb fallback
163 // that just allocates device 0, 1, 2, ...,
168 // Capabilities are confusing. The driver has generic
169 // capabilities "gpu" and "nvidia" but then there's
170 // additional capabilities "compute" and "utility"
171 // that are passed to nvidia-container-cli.
173 // "compute" means include the CUDA libraries and
174 // "utility" means include the CUDA utility programs
175 // (like nvidia-smi).
177 // https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/daemon/nvidia_linux.go#L37
178 // https://github.com/containerd/containerd/blob/main/contrib/nvidia/nvidia.go
179 hostCfg.Resources.DeviceRequests = append(hostCfg.Resources.DeviceRequests, dockercontainer.DeviceRequest{
182 DeviceIDs: deviceIds,
183 Capabilities: [][]string{[]string{"gpu", "nvidia", "compute", "utility"}},
186 if spec.GPUStack == "rocm" && spec.GPUDeviceCount > 0 {
187 // there's no container toolkit or builtin Docker
188 // support for ROCm so we just provide the devices to
189 // the container ourselves.
191 // fortunately, the minimum version of this seems to be this:
192 // rendergroup=$(getent group render | cut -d: -f3)
193 // videogroup=$(getent group video | cut -d: -f3)
194 // docker run -it --device=/dev/kfd --device=/dev/dri/renderD128 --user $(id -u) --group-add $videogroup --group-add $rendergroup "$@"
196 hostCfg.Devices = append(hostCfg.Devices, dockercontainer.DeviceMapping{
197 PathInContainer: "/dev/kfd",
198 PathOnHost: "/dev/kfd",
199 CgroupPermissions: "rwm",
201 info, _ := os.Stat("/dev/kfd")
202 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
203 // Make sure the container has access
204 // to the group id that allow it to
205 // access the device.
206 hostCfg.GroupAdd = append(hostCfg.GroupAdd, fmt.Sprintf("%v", stat.Gid))
209 var deviceIndexes []int
210 if amdVisibleDevices := os.Getenv("AMD_VISIBLE_DEVICES"); amdVisibleDevices != "" {
211 // If a resource manager/dispatcher told us to
212 // select specific devices, so we need to
214 for _, dev := range strings.Split(amdVisibleDevices, ",") {
215 intDev, err := strconv.Atoi(dev)
219 deviceIndexes = append(deviceIndexes, intDev)
222 // Try every device, we'll check below to see
223 // which ones actually exists.
224 for i := 0; i < 128; i++ {
225 deviceIndexes = append(deviceIndexes, i)
228 for _, intDev := range deviceIndexes {
229 devPath := fmt.Sprintf("/dev/dri/renderD%v", 128+intDev)
230 info, err := os.Stat(devPath)
234 hostCfg.Devices = append(hostCfg.Devices, dockercontainer.DeviceMapping{
235 PathInContainer: devPath,
237 CgroupPermissions: "rwm",
239 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
240 // Make sure the container has access
241 // to the group id that allow it to
242 // access the device.
243 if !slices.Contains(hostCfg.GroupAdd, fmt.Sprintf("%v", stat.Gid)) {
244 hostCfg.GroupAdd = append(hostCfg.GroupAdd, fmt.Sprintf("%v", stat.Gid))
250 for path, mount := range spec.BindMounts {
251 bind := mount.HostPath + ":" + path
255 hostCfg.Binds = append(hostCfg.Binds, bind)
257 if spec.EnableNetwork {
258 hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
263 func (e *dockerExecutor) Create(spec containerSpec) error {
264 cfg, hostCfg := e.config(spec)
265 created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, nil, e.containerUUID)
267 return fmt.Errorf("While creating container: %v", err)
269 e.containerID = created.ID
270 return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
273 func (e *dockerExecutor) Pid() int {
274 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
276 ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
277 if err == nil && ctr.State != nil {
284 func (e *dockerExecutor) Start() error {
285 return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockercontainer.StartOptions{})
288 func (e *dockerExecutor) Stop() error {
289 err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockercontainer.RemoveOptions{Force: true})
290 if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
296 // Wait for the container to terminate, capture the exit code, and
297 // wait for stdout/stderr logging to finish.
298 func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
299 ctx, cancel := context.WithCancel(ctx)
301 watchdogErr := make(chan error, 1)
303 ticker := time.NewTicker(e.watchdogInterval)
306 dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
307 ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
309 if ctx.Err() != nil {
310 // Either the container already
311 // exited, or our caller is trying to
314 } else if err != nil {
315 watchdogErr <- fmt.Errorf("error inspecting container: %s", err)
316 } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
317 watchdogErr <- fmt.Errorf("container is not running: State=%v", ctr.State)
324 waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
328 case waitBody := <-waitOk:
329 // wait for stdout/stderr to complete
331 return int(waitBody.StatusCode), nil
333 case err := <-waitErr:
334 return -1, fmt.Errorf("container wait: %v", err)
339 case err := <-watchdogErr:
343 e.logf("docker watchdog: %s", err)
345 if errors >= dockerWatchdogThreshold {
346 e.logf("docker watchdog: giving up")
354 func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
355 resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockercontainer.AttachOptions{
362 return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
367 errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
370 e.doneIO = make(chan struct{})
372 e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
373 if e.errIO == nil && errStdin != nil {
381 func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
383 _, err := io.Copy(conn, stdin)
385 return fmt.Errorf("While writing to docker container on stdin: %v", err)
390 // Handle docker log protocol; see
391 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
392 func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
393 header := make([]byte, 8)
396 _, err = io.ReadAtLeast(reader, header, 8)
403 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
405 _, err = io.CopyN(stdout, reader, readsize)
408 _, err = io.CopyN(stderr, reader, readsize)
412 return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
417 func (e *dockerExecutor) Close() {
418 e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockercontainer.RemoveOptions{Force: true})
421 func (e *dockerExecutor) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, injectcmd []string) (*exec.Cmd, error) {
422 cmd := exec.CommandContext(ctx, "docker", "exec", "-i", "--detach-keys="+detachKeys, "--user="+username)
424 cmd.Args = append(cmd.Args, "-t")
426 cmd.Args = append(cmd.Args, e.containerID)
427 cmd.Args = append(cmd.Args, injectcmd...)
431 func (e *dockerExecutor) IPAddress() (string, error) {
432 if ip, ok := e.savedIPAddress.Load().(*string); ok {
435 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
437 ctr, err := e.dockerclient.ContainerInspect(ctx, e.containerID)
439 return "", fmt.Errorf("cannot get docker container info: %s", err)
441 ip := ctr.NetworkSettings.IPAddress
443 // TODO: try to enable networking if it wasn't
444 // already enabled when the container was
446 return "", fmt.Errorf("container has no IP address")
448 e.savedIPAddress.Store(&ip)