X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d6703f161c75a780ff645dc1ec980fca8a3e315b..3ceb42c882b32fb9e4ef79679576254f98cdfb3d:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 64e79a68eb..c8f77f4917 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "encoding/json" "errors" @@ -361,6 +362,13 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } } + if bind == "stdin" { + // Is it a "collection" mount kind? + if mnt.Kind != "collection" && mnt.Kind != "json" { + return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind) + } + } + if bind == "/etc/arvados/ca-certificates.crt" { needCertMount = false } @@ -663,8 +671,40 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.CrunchLog.Print("Attaching container streams") + // If stdin mount is provided, attach it to the docker container + var stdinRdr keepclient.Reader + var stdinJson []byte + if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok { + if stdinMnt.Kind == "collection" { + var stdinColl arvados.Collection + collId := stdinMnt.UUID + if collId == "" { + collId = stdinMnt.PortableDataHash + } + err = runner.ArvClient.Get("collections", collId, nil, &stdinColl) + if err != nil { + return fmt.Errorf("While getting stding collection: %v", err) + } + + stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path) + if os.IsNotExist(err) { + return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path) + } else if err != nil { + return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err) + } + + defer stdinRdr.Close() + } else if stdinMnt.Kind == "json" { + stdinJson, err = json.Marshal(stdinMnt.Content) + if err != nil { + return fmt.Errorf("While encoding stdin json data: %v", err) + } + } + } + + stdinUsed := stdinRdr != nil || len(stdinJson) != 0 response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID, - dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true}) + dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true}) if err != nil { return fmt.Errorf("While attaching container stdout/stderr streams: %v", err) } @@ -698,6 +738,26 @@ func (runner *ContainerRunner) AttachStreams() (err error) { } runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr")) + if stdinRdr != nil { + go func() { + _, err := io.Copy(response.Conn, stdinRdr) + if err != nil { + runner.CrunchLog.Print("While writing stdin collection to docker container %q", err) + runner.stop() + } + response.Conn.Close() + }() + } else if len(stdinJson) != 0 { + go func() { + _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson)) + if err != nil { + runner.CrunchLog.Print("While writing stdin json to docker container %q", err) + runner.stop() + } + response.Conn.Close() + }() + } + go runner.ProcessDockerAttach(response.Reader) return nil @@ -743,7 +803,7 @@ func (runner *ContainerRunner) CreateContainer() error { } } - createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, nil, nil, "") + createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID) if err != nil { return fmt.Errorf("While creating container: %v", err) } @@ -780,7 +840,7 @@ func (runner *ContainerRunner) WaitFinish() error { return fmt.Errorf("container wait: %v", err) } - runner.CrunchLog.Printf("container wait API status code: %v", waitDocker) + runner.CrunchLog.Printf("Container exited with code: %v", waitDocker) code := int(waitDocker) runner.ExitCode = &code