-// AttachStreams connects the docker container stdin, stdout and stderr logs
-// to the Arvados logger which logs to Keep and the API server logs table.
-func (runner *ContainerRunner) AttachStreams() (err error) {
-
- runner.CrunchLog.Print("Attaching container streams")
-
- // If stdin mount is provided, attach it to the docker container
- var stdinRdr arvados.File
- var stdinJson []byte
- if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
- if stdinMnt.Kind == "collection" {
- var stdinColl arvados.Collection
- collId := stdinMnt.UUID
- if collId == "" {
- collId = stdinMnt.PortableDataHash
- }
- err = runner.ContainerArvClient.Get("collections", collId, nil, &stdinColl)
- if err != nil {
- return fmt.Errorf("While getting stdin collection: %v", err)
- }
-
- stdinRdr, err = runner.ContainerKeepClient.ManifestFileReader(
- manifest.Manifest{Text: stdinColl.ManifestText},
- stdinMnt.Path)
- if os.IsNotExist(err) {
- return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
- } else if err != nil {
- return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
- }
- } else if stdinMnt.Kind == "json" {
- stdinJson, err = json.Marshal(stdinMnt.Content)
- if err != nil {
- return fmt.Errorf("While encoding stdin json data: %v", err)
- }
- }
- }
-
- stdinUsed := stdinRdr != nil || len(stdinJson) != 0
- response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
- dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
- if err != nil {
- return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
- }
-
- runner.loggingDone = make(chan bool)
-
- if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
- stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
- if err != nil {
- return err
- }
- runner.Stdout = stdoutFile
- } else if w, err := runner.NewLogWriter("stdout"); err != nil {
- return err
- } else {
- runner.Stdout = NewThrottledLogger(w)
- }
-
- if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
- stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
- if err != nil {
- return err
- }
- runner.Stderr = stderrFile
- } else if w, err := runner.NewLogWriter("stderr"); err != nil {
- return err
- } else {
- runner.Stderr = NewThrottledLogger(w)
- }
-
- if stdinRdr != nil {
- go func() {
- _, err := io.Copy(response.Conn, stdinRdr)
- if err != nil {
- runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err)
- runner.stop(nil)
- }
- stdinRdr.Close()
- response.CloseWrite()
- }()
- } else if len(stdinJson) != 0 {
- go func() {
- _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
- if err != nil {
- runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
- runner.stop(nil)
- }
- response.CloseWrite()
- }()
- }
-
- go runner.ProcessDockerAttach(response.Reader)
-
- return nil
-}
-