package main
import (
+ "bytes"
"context"
"encoding/json"
"errors"
}
}
+ if bind == "stdin" {
+ // Is it a "collection" mount kind?
+ if mnt.Kind != "collection" && mnt.Kind != "json" {
+ return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
+ }
+ }
+
if bind == "/etc/arvados/ca-certificates.crt" {
needCertMount = false
}
runner.CrunchLog.Print("Attaching container streams")
+ // If stdin mount is provided, attach it to the docker container
+ var stdinRdr keepclient.Reader
+ var stdinJson []byte
+ if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+ if stdinMnt.Kind == "collection" {
+ var stdinColl arvados.Collection
+ collId := stdinMnt.UUID
+ if collId == "" {
+ collId = stdinMnt.PortableDataHash
+ }
+ err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+ if err != nil {
+ return fmt.Errorf("While getting stding collection: %v", err)
+ }
+
+ stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+ if os.IsNotExist(err) {
+ return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+ } else if err != nil {
+ return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+ }
+
+ defer stdinRdr.Close()
+ } else if stdinMnt.Kind == "json" {
+ stdinJson, err = json.Marshal(stdinMnt.Content)
+ if err != nil {
+ return fmt.Errorf("While encoding stdin json data: %v", err)
+ }
+ }
+ }
+
+ stdinUsed := stdinRdr != nil || len(stdinJson) != 0
response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
- dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
+ dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
if err != nil {
return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
}
}
runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+ if stdinRdr != nil {
+ go func() {
+ _, err := io.Copy(response.Conn, stdinRdr)
+ if err != nil {
+ runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+ runner.stop()
+ }
+ response.Conn.Close()
+ }()
+ } else if len(stdinJson) != 0 {
+ go func() {
+ _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+ if err != nil {
+ runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+ runner.stop()
+ }
+ response.Conn.Close()
+ }()
+ }
+
go runner.ProcessDockerAttach(response.Reader)
return nil
}
}
- createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, nil, nil, "")
+ createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
}
return fmt.Errorf("container wait: %v", err)
}
- runner.CrunchLog.Printf("container wait API status code: %v", waitDocker)
+ runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
code := int(waitDocker)
runner.ExitCode = &code