X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/97e4c0fa88224bb282ddd502dc8fda18cca3ba21..8974e10fe2dbf643861f75cffa0fba9a6db972c8:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 01edb0a516..cebebb104a 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -15,6 +15,7 @@ import ( "os" "os/exec" "os/signal" + "path" "strings" "sync" "syscall" @@ -44,6 +45,7 @@ type Mount struct { PortableDataHash string `json:"portable_data_hash"` UUID string `json:"uuid"` DeviceType string `json:"device_type"` + Path string `json:"path"` } // Collection record returned by the API server. @@ -99,7 +101,7 @@ type ContainerRunner struct { NewLogWriter loggingDone chan bool CrunchLog *ThrottledLogger - Stdout *ThrottledLogger + Stdout io.WriteCloser Stderr *ThrottledLogger LogCollection *CollectionWriter LogsPDH *string @@ -246,6 +248,22 @@ func (runner *ContainerRunner) SetupMounts() (err error) { runner.Binds = nil for bind, mnt := range runner.ContainerRecord.Mounts { + if bind == "stdout" { + // Is it a "file" mount kind? + if mnt.Kind != "file" { + return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind) + } + + // Does path start with OutputPath? + prefix := runner.ContainerRecord.OutputPath + if !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + if !strings.HasPrefix(mnt.Path, prefix) { + return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix) + } + } + if mnt.Kind == "collection" { var src string if mnt.UUID != "" && mnt.PortableDataHash != "" { @@ -296,8 +314,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } else { runner.Binds = append(runner.Binds, bind) } - } else { - return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind) } } @@ -383,7 +399,31 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.loggingDone = make(chan bool) - runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout")) + if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok { + stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):] + index := strings.LastIndex(stdoutPath, "/") + if index > 0 { + subdirs := stdoutPath[:index] + if subdirs != "" { + st, err := os.Stat(runner.HostOutputDir) + if err != nil { + return fmt.Errorf("While Stat on temp dir: %v", err) + } + stdoutPath := path.Join(runner.HostOutputDir, subdirs) + err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777) + if err != nil { + return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err) + } + } + } + stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath)) + if err != nil { + return fmt.Errorf("While creating stdout file: %v", err) + } + runner.Stdout = stdoutFile + } else { + runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout")) + } runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr")) go runner.ProcessDockerAttach(containerReader)