package main
import (
+ "bytes"
"context"
"encoding/json"
"errors"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+
dockertypes "github.com/docker/docker/api/types"
- containertypes "github.com/docker/docker/api/types/container"
+ dockercontainer "github.com/docker/docker/api/types/container"
+ dockernetwork "github.com/docker/docker/api/types/network"
dockerclient "github.com/docker/docker/client"
)
// ThinDockerClient is the minimal Docker client interface used by crunch-run.
type ThinDockerClient interface {
- ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
- ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
- ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
- ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig,
- networkingConfig *network.NetworkingConfig, containerName string) (container.ContainerCreateCreatedBody, error)
- ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
+ ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+ networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
+ ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
ContainerWait(ctx context.Context, container string) (int64, error)
+ ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
+ ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
+ ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
+}
+
+// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
+// that executes the docker requests on dockerclient.Client
+type ThinDockerClientProxy struct {
+ Docker *dockerclient.Client
+}
+
+// ContainerAttach invokes dockerclient.Client.ContainerAttach
+func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
+ return proxy.Docker.ContainerAttach(ctx, container, options)
+}
+
+// ContainerCreate invokes dockerclient.Client.ContainerCreate
+func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+ networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
+ return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
+}
+
+// ContainerStart invokes dockerclient.Client.ContainerStart
+func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+ return proxy.Docker.ContainerStart(ctx, container, options)
+}
+
+// ContainerStop invokes dockerclient.Client.ContainerStop
+func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+ return proxy.Docker.ContainerStop(ctx, container, timeout)
+}
+
+// ContainerWait invokes dockerclient.Client.ContainerWait
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) {
+ return proxy.Docker.ContainerWait(ctx, container)
+}
+
+// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
+func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+ return proxy.Docker.ImageInspectWithRaw(ctx, image)
+}
+
+// ImageLoad invokes dockerclient.Client.ImageLoad
+func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+ return proxy.Docker.ImageLoad(ctx, input, quiet)
+}
+
+// ImageRemove invokes dockerclient.Client.ImageRemove
+func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
+ return proxy.Docker.ImageRemove(ctx, image, options)
}
// ContainerRunner is the main stateful struct used for a single execution of a
ArvClient IArvadosClient
Kc IKeepClient
arvados.Container
- ContainerConfig containertypes.Config
- dockerclient.HostConfig
+ ContainerConfig dockercontainer.Config
+ dockercontainer.HostConfig
token string
ContainerID string
ExitCode *int
loggingDone chan bool
CrunchLog *ThrottledLogger
Stdout io.WriteCloser
- Stderr *ThrottledLogger
+ Stderr io.WriteCloser
LogCollection *CollectionWriter
LogsPDH *string
RunArvMount
}
runner.cCancelled = true
if runner.cStarted {
- err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, 10)
+ timeout := time.Duration(10)
+ err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
if err != nil {
log.Printf("StopContainer failed: %s", err)
}
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
- response, err = runner.Docker.ImageLoad(context.TODO(), readCloser, false)
- response.Body.Close()
+ response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, false)
if err != nil {
return fmt.Errorf("While loading container image into Docker: %v", err)
}
+ response.Body.Close()
} else {
runner.CrunchLog.Print("Docker image is available")
}
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
- if bind == "stdout" {
+ if bind == "stdout" || bind == "stderr" {
// Is it a "file" mount kind?
if mnt.Kind != "file" {
- return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+ return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
}
// Does path start with OutputPath?
prefix += "/"
}
if !strings.HasPrefix(mnt.Path, prefix) {
- return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+ return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+ }
+ }
+
+ if bind == "stdin" {
+ // Is it a "collection" mount kind?
+ if mnt.Kind != "collection" && mnt.Kind != "json" {
+ return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
}
}
return nil
}
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
func (runner *ContainerRunner) AttachStreams() (err error) {
runner.CrunchLog.Print("Attaching container streams")
- var containerReader io.Reader
- containerReader, err = runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
- &dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
+ // If stdin mount is provided, attach it to the docker container
+ var stdinRdr keepclient.Reader
+ var stdinJson []byte
+ if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+ if stdinMnt.Kind == "collection" {
+ var stdinColl arvados.Collection
+ collId := stdinMnt.UUID
+ if collId == "" {
+ collId = stdinMnt.PortableDataHash
+ }
+ err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+ if err != nil {
+ return fmt.Errorf("While getting stding collection: %v", err)
+ }
+
+ stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+ if os.IsNotExist(err) {
+ return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+ } else if err != nil {
+ return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+ }
+
+ defer stdinRdr.Close()
+ } else if stdinMnt.Kind == "json" {
+ stdinJson, err = json.Marshal(stdinMnt.Content)
+ if err != nil {
+ return fmt.Errorf("While encoding stdin json data: %v", err)
+ }
+ }
+ }
+
+ stdinUsed := stdinRdr != nil || len(stdinJson) != 0
+ response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
+ dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
if err != nil {
return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
}
runner.loggingDone = make(chan bool)
if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
- stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
- index := strings.LastIndex(stdoutPath, "/")
- if index > 0 {
- subdirs := stdoutPath[:index]
- if subdirs != "" {
- st, err := os.Stat(runner.HostOutputDir)
- if err != nil {
- return fmt.Errorf("While Stat on temp dir: %v", err)
- }
- stdoutPath := path.Join(runner.HostOutputDir, subdirs)
- err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
- if err != nil {
- return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
- }
- }
- }
- stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
if err != nil {
- return fmt.Errorf("While creating stdout file: %v", err)
+ return err
}
runner.Stdout = stdoutFile
} else {
runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
}
- runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
- go runner.ProcessDockerAttach(containerReader)
+ if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+ stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+ if err != nil {
+ return err
+ }
+ runner.Stderr = stderrFile
+ } else {
+ runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+ }
+
+ if stdinRdr != nil {
+ go func() {
+ _, err := io.Copy(response.Conn, stdinRdr)
+ if err != nil {
+ runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+ runner.stop()
+ }
+ response.Conn.Close()
+ }()
+ } else if len(stdinJson) != 0 {
+ go func() {
+ _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+ if err != nil {
+ runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+ runner.stop()
+ }
+ response.Conn.Close()
+ }()
+ }
+
+ go runner.ProcessDockerAttach(response.Reader)
return nil
}
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+ stdoutPath := mntPath[len(runner.Container.OutputPath):]
+ index := strings.LastIndex(stdoutPath, "/")
+ if index > 0 {
+ subdirs := stdoutPath[:index]
+ if subdirs != "" {
+ st, err := os.Stat(runner.HostOutputDir)
+ if err != nil {
+ return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+ }
+ stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+ err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+ if err != nil {
+ return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+ }
+ }
+ }
+ stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ if err != nil {
+ return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+ }
+
+ return stdoutFile, nil
+}
+
// CreateContainer creates the docker container.
func (runner *ContainerRunner) CreateContainer() error {
runner.CrunchLog.Print("Creating Docker container")
runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
- runner.ContainerID = createdBody.ID
- runner.HostConfig = dockerclient.HostConfig{
- Binds: runner.Binds,
- CgroupParent: runner.setCgroupParent,
- LogConfig: dockerclient.LogConfig{
+ runner.HostConfig = dockercontainer.HostConfig{
+ Binds: runner.Binds,
+ Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+ LogConfig: dockercontainer.LogConfig{
Type: "none",
},
}
"ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
"ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
)
- runner.HostConfig.NetworkMode = runner.networkMode
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
} else {
if runner.enableNetwork == "always" {
- runner.HostConfig.NetworkMode = runner.networkMode
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
} else {
- runner.HostConfig.NetworkMode = "none"
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
}
}
- createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, nil, nil, "")
+ createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
}
+ runner.ContainerID = createdBody.ID
+
return runner.AttachStreams()
}
if runner.cCancelled {
return ErrCancelled
}
- err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
+ err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
+ dockertypes.ContainerStartOptions{})
if err != nil {
return fmt.Errorf("could not start container: %v", err)
}
func (runner *ContainerRunner) WaitFinish() error {
runner.CrunchLog.Print("Waiting for container to finish")
- waitDocker := runner.Docker.Wait(runner.ContainerID)
+ waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
+ if err != nil {
+ return fmt.Errorf("container wait: %v", err)
+ }
+
+ runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
+ code := int(waitDocker)
+ runner.ExitCode = &code
+
waitMount := runner.ArvMountExit
- for waitDocker != nil {
- select {
- case err := <-waitMount:
- runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
- waitMount = nil
- runner.stop()
- case wr := <-waitDocker:
- if wr.Error != nil {
- return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
- }
- runner.ExitCode = &wr.ExitCode
- waitDocker = nil
- }
+ select {
+ case err := <-waitMount:
+ runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
+ waitMount = nil
+ runner.stop()
+ default:
}
// wait for stdout/stderr to complete
log.Fatalf("%s: %v", containerId, err)
}
- cr := NewContainerRunner(api, kc, docker, containerId)
+ dockerClientProxy := ThinDockerClientProxy{Docker: docker}
+
+ cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent