8465: added stdin redirection for json mount.
[arvados.git] / services / crunch-run / crunchrun.go
index 26c8a7a7eab29e420c69181d3fd19e825fe5a72f..f22680fca568c92fe9e765185120789d4a37913d 100644 (file)
@@ -1,6 +1,8 @@
 package main
 
 import (
+       "bytes"
+       "context"
        "encoding/json"
        "errors"
        "flag"
@@ -24,7 +26,11 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "github.com/curoverse/dockerclient"
+
+       dockertypes "github.com/docker/docker/api/types"
+       dockercontainer "github.com/docker/docker/api/types/container"
+       dockernetwork "github.com/docker/docker/api/types/network"
+       dockerclient "github.com/docker/docker/client"
 )
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -44,6 +50,7 @@ var ErrCancelled = errors.New("Cancelled")
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
+       CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -55,14 +62,62 @@ type MkTempDir func(string, string) (string, error)
 
 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
 type ThinDockerClient interface {
-       StopContainer(id string, timeout int) error
-       InspectImage(id string) (*dockerclient.ImageInfo, error)
-       LoadImage(reader io.Reader) error
-       CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
-       StartContainer(id string, config *dockerclient.HostConfig) error
-       AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
-       Wait(id string) <-chan dockerclient.WaitResult
-       RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
+       ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
+       ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+               networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
+       ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
+       ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+       ContainerWait(ctx context.Context, container string) (int64, error)
+       ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
+       ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
+       ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
+}
+
+// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
+// that executes the docker requests on dockerclient.Client
+type ThinDockerClientProxy struct {
+       Docker *dockerclient.Client
+}
+
+// ContainerAttach invokes dockerclient.Client.ContainerAttach
+func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
+       return proxy.Docker.ContainerAttach(ctx, container, options)
+}
+
+// ContainerCreate invokes dockerclient.Client.ContainerCreate
+func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+       networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
+       return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
+}
+
+// ContainerStart invokes dockerclient.Client.ContainerStart
+func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+       return proxy.Docker.ContainerStart(ctx, container, options)
+}
+
+// ContainerStop invokes dockerclient.Client.ContainerStop
+func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+       return proxy.Docker.ContainerStop(ctx, container, timeout)
+}
+
+// ContainerWait invokes dockerclient.Client.ContainerWait
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) {
+       return proxy.Docker.ContainerWait(ctx, container)
+}
+
+// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
+func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+       return proxy.Docker.ImageInspectWithRaw(ctx, image)
+}
+
+// ImageLoad invokes dockerclient.Client.ImageLoad
+func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+       return proxy.Docker.ImageLoad(ctx, input, quiet)
+}
+
+// ImageRemove invokes dockerclient.Client.ImageRemove
+func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
+       return proxy.Docker.ImageRemove(ctx, image, options)
 }
 
 // ContainerRunner is the main stateful struct used for a single execution of a
@@ -72,8 +127,8 @@ type ContainerRunner struct {
        ArvClient IArvadosClient
        Kc        IKeepClient
        arvados.Container
-       dockerclient.ContainerConfig
-       dockerclient.HostConfig
+       ContainerConfig dockercontainer.Config
+       dockercontainer.HostConfig
        token       string
        ContainerID string
        ExitCode    *int
@@ -117,6 +172,9 @@ type ContainerRunner struct {
        cStateLock sync.Mutex
        cStarted   bool // StartContainer() succeeded
        cCancelled bool // StopContainer() invoked
+
+       enableNetwork string // one of "default" or "always"
+       networkMode   string // passed through to HostConfig.NetworkMode
 }
 
 // SetupSignals sets up signal handling to gracefully terminate the underlying
@@ -143,7 +201,8 @@ func (runner *ContainerRunner) stop() {
        }
        runner.cCancelled = true
        if runner.cStarted {
-               err := runner.Docker.StopContainer(runner.ContainerID, 10)
+               timeout := time.Duration(10)
+               err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
                if err != nil {
                        log.Printf("StopContainer failed: %s", err)
                }
@@ -174,7 +233,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
        runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
 
-       _, err = runner.Docker.InspectImage(imageID)
+       _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
        if err != nil {
                runner.CrunchLog.Print("Loading Docker image from keep")
 
@@ -184,10 +243,11 @@ func (runner *ContainerRunner) LoadImage() (err error) {
                        return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
                }
 
-               err = runner.Docker.LoadImage(readCloser)
+               response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, false)
                if err != nil {
                        return fmt.Errorf("While loading container image into Docker: %v", err)
                }
+               response.Body.Close()
        } else {
                runner.CrunchLog.Print("Docker image is available")
        }
@@ -303,6 +363,13 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                }
 
+               if bind == "stdin" {
+                       // Is it a "collection" mount kind?
+                       if mnt.Kind != "collection" && mnt.Kind != "json" {
+                               return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
+                       }
+               }
+
                if bind == "/etc/arvados/ca-certificates.crt" {
                        needCertMount = false
                }
@@ -605,9 +672,40 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
 
        runner.CrunchLog.Print("Attaching container streams")
 
-       var containerReader io.Reader
-       containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
-               &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
+       // If stdin mount is provided, attach it to the docker container
+       var stdinRdr keepclient.Reader
+       var stdinJson []byte
+       if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+               if stdinMnt.Kind == "collection" {
+                       var stdinColl arvados.Collection
+                       collId := stdinMnt.UUID
+                       if collId == "" {
+                               collId = stdinMnt.PortableDataHash
+                       }
+                       err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+                       if err != nil {
+                               return fmt.Errorf("While getting stding collection: %v", err)
+                       }
+
+                       stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
+                       if os.IsNotExist(err) {
+                               return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+                       } else if err != nil {
+                               return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+                       }
+
+                       defer stdinRdr.Close()
+               } else if stdinMnt.Kind == "json" {
+                       stdinJson, err = json.Marshal(stdinMnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("While encoding stdin json data: %v", err)
+                       }
+               }
+       }
+
+       stdinUsed := stdinRdr != nil || len(stdinJson) != 0
+       response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
+               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
                return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
        }
@@ -641,7 +739,34 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        }
        runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
 
-       go runner.ProcessDockerAttach(containerReader)
+       if stdinRdr != nil {
+               copyErrC := make(chan error)
+               go func() {
+                       _, err := io.Copy(response.Conn, stdinRdr)
+                       copyErrC <- err
+                       close(copyErrC)
+               }()
+
+               copyErr := <-copyErrC
+               if copyErr != nil {
+                       return fmt.Errorf("While writing stdin collection to docker container %q", copyErr)
+               }
+       } else if len(stdinJson) != 0 {
+               copyErrC := make(chan error)
+               go func() {
+                       jsonRdr := bytes.NewReader(stdinJson)
+                       _, err := io.Copy(response.Conn, jsonRdr)
+                       copyErrC <- err
+                       close(copyErrC)
+               }()
+
+               copyErr := <-copyErrC
+               if copyErr != nil {
+                       return fmt.Errorf("While writing stdin json to docker container %q", copyErr)
+               }
+       }
+
+       go runner.ProcessDockerAttach(response.Reader)
 
        return nil
 }
@@ -658,6 +783,15 @@ func (runner *ContainerRunner) CreateContainer() error {
        for k, v := range runner.Container.Environment {
                runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
        }
+
+       runner.HostConfig = dockercontainer.HostConfig{
+               Binds:  runner.Binds,
+               Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+               LogConfig: dockercontainer.LogConfig{
+                       Type: "none",
+               },
+       }
+
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
                tok, err := runner.ContainerToken()
                if err != nil {
@@ -668,24 +802,21 @@ func (runner *ContainerRunner) CreateContainer() error {
                        "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
                        "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
                )
-               runner.ContainerConfig.NetworkDisabled = false
+               runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
        } else {
-               runner.ContainerConfig.NetworkDisabled = true
+               if runner.enableNetwork == "always" {
+                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
+               } else {
+                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
+               }
        }
 
-       var err error
-       runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
+       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)
        }
 
-       runner.HostConfig = dockerclient.HostConfig{
-               Binds:        runner.Binds,
-               CgroupParent: runner.setCgroupParent,
-               LogConfig: dockerclient.LogConfig{
-                       Type: "none",
-               },
-       }
+       runner.ContainerID = createdBody.ID
 
        return runner.AttachStreams()
 }
@@ -698,7 +829,8 @@ func (runner *ContainerRunner) StartContainer() error {
        if runner.cCancelled {
                return ErrCancelled
        }
-       err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
+       err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
+               dockertypes.ContainerStartOptions{})
        if err != nil {
                return fmt.Errorf("could not start container: %v", err)
        }
@@ -711,21 +843,22 @@ func (runner *ContainerRunner) StartContainer() error {
 func (runner *ContainerRunner) WaitFinish() error {
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       waitDocker := runner.Docker.Wait(runner.ContainerID)
+       waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
+       if err != nil {
+               return fmt.Errorf("container wait: %v", err)
+       }
+
+       runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
+       code := int(waitDocker)
+       runner.ExitCode = &code
+
        waitMount := runner.ArvMountExit
-       for waitDocker != nil {
-               select {
-               case err := <-waitMount:
-                       runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
-                       waitMount = nil
-                       runner.stop()
-               case wr := <-waitDocker:
-                       if wr.Error != nil {
-                               return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
-                       }
-                       runner.ExitCode = &wr.ExitCode
-                       waitDocker = nil
-               }
+       select {
+       case err := <-waitMount:
+               runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
+               waitMount = nil
+               runner.stop()
+       default:
        }
 
        // wait for stdout/stderr to complete
@@ -1149,6 +1282,14 @@ func main() {
        cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
+       enableNetwork := flag.String("container-enable-networking", "default",
+               `Specify if networking should be enabled for container.  One of 'default', 'always':
+       default: only enable networking if container requests it.
+       always:  containers always have networking enabled
+       `)
+       networkMode := flag.String("container-network-mode", "default",
+               `Set networking mode for container.  Corresponds to Docker network mode (--net).
+       `)
        flag.Parse()
 
        containerId := flag.Arg(0)
@@ -1170,16 +1311,22 @@ func main() {
        }
        kc.Retries = 4
 
-       var docker *dockerclient.DockerClient
-       docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+       var docker *dockerclient.Client
+       // API version 1.21 corresponds to Docker 1.9, which is currently the
+       // minimum version we want to support.
+       docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
        if err != nil {
                log.Fatalf("%s: %v", containerId, err)
        }
 
-       cr := NewContainerRunner(api, kc, docker, containerId)
+       dockerClientProxy := ThinDockerClientProxy{Docker: docker}
+
+       cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent
+       cr.enableNetwork = *enableNetwork
+       cr.networkMode = *networkMode
        if *cgroupParentSubsystem != "" {
                p := findCgroup(*cgroupParentSubsystem)
                cr.setCgroupParent = p