"os"
"os/exec"
"os/signal"
+ "path"
"strings"
"sync"
"syscall"
PortableDataHash string `json:"portable_data_hash"`
UUID string `json:"uuid"`
DeviceType string `json:"device_type"`
+ Path string `json:"path"`
}
// Collection record returned by the API server.
NewLogWriter
loggingDone chan bool
CrunchLog *ThrottledLogger
- Stdout *ThrottledLogger
+ Stdout io.WriteCloser
Stderr *ThrottledLogger
LogCollection *CollectionWriter
LogsPDH *string
runner.Binds = nil
for bind, mnt := range runner.ContainerRecord.Mounts {
+ if bind == "stdout" {
+ // Is it a "file" mount kind?
+ if mnt.Kind != "file" {
+ return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+ }
+
+ // Does path start with OutputPath?
+ prefix := runner.ContainerRecord.OutputPath
+ if !strings.HasSuffix(prefix, "/") {
+ prefix += "/"
+ }
+ if !strings.HasPrefix(mnt.Path, prefix) {
+ return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+ }
+ }
+
if mnt.Kind == "collection" {
var src string
if mnt.UUID != "" && mnt.PortableDataHash != "" {
} else {
runner.Binds = append(runner.Binds, bind)
}
- } else {
- return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
}
}
runner.loggingDone = make(chan bool)
- runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+ if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok {
+ stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):]
+ index := strings.LastIndex(stdoutPath, "/")
+ if index > 0 {
+ subdirs := stdoutPath[:index]
+ if subdirs != "" {
+ st, err := os.Stat(runner.HostOutputDir)
+ if err != nil {
+ return fmt.Errorf("While Stat on temp dir: %v", err)
+ }
+ stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+ err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+ if err != nil {
+ return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+ }
+ }
+ }
+ stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ if err != nil {
+ return fmt.Errorf("While creating stdout file: %v", err)
+ }
+ runner.Stdout = stdoutFile
+ } else {
+ runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+ }
runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
go runner.ProcessDockerAttach(containerReader)
cr.CleanupDirs()
}
}
+
+func (s *TestSuite) TestStdout(c *C) {
+ helperRecord := `{`
+ helperRecord += `"command": ["/bin/sh", "-c", "echo $FROBIZ"],`
+ helperRecord += `"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",`
+ helperRecord += `"cwd": "/bin",`
+ helperRecord += `"environment": {"FROBIZ": "bilbo"},`
+ helperRecord += `"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"} },`
+ helperRecord += `"output_path": "/tmp",`
+ helperRecord += `"priority": 1,`
+ helperRecord += `"runtime_constraints": {}`
+ helperRecord += `}`
+
+ api, _ := FullRunHelper(c, helperRecord, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
+
+ c.Check(api.Calls, Equals, 6)
+ c.Check(api.Content[5]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+ c.Check(api.Content[5]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+ c.Check(api.Content[2]["collection"].(arvadosclient.Dict)["manifest_text"], Equals, "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n")
+}
+
+// Used by the TestStdoutWithWrongPath*()
+func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
+ rec := ContainerRecord{}
+ err = json.NewDecoder(strings.NewReader(record)).Decode(&rec)
+ c.Check(err, IsNil)
+
+ docker := NewTestDockerClient()
+ docker.fn = fn
+ docker.RemoveImage(hwImageId, true)
+
+ api = &ArvTestClient{ContainerRecord: rec}
+ cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ am := &ArvMountCmdLine{}
+ cr.RunArvMount = am.ArvMountTest
+
+ err = cr.Run()
+ return
+}
+
+func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
+ _, _, err := StdoutErrorRunHelper(c, `{
+ "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} },
+ "output_path": "/tmp"
+}`, func(t *TestDockerClient) {})
+
+ c.Check(err, NotNil)
+ c.Check(strings.Contains(err.Error(), "Stdout path does not start with OutputPath"), Equals, true)
+}
+
+func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
+ _, _, err := StdoutErrorRunHelper(c, `{
+ "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} },
+ "output_path": "/tmp"
+}`, func(t *TestDockerClient) {})
+
+ c.Check(err, NotNil)
+ c.Check(strings.Contains(err.Error(), "Unsupported mount kind 'tmp' for stdout"), Equals, true)
+}
+
+func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
+ _, _, err := StdoutErrorRunHelper(c, `{
+ "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} },
+ "output_path": "/tmp"
+}`, func(t *TestDockerClient) {})
+
+ c.Check(err, NotNil)
+ c.Check(strings.Contains(err.Error(), "Unsupported mount kind 'collection' for stdout"), Equals, true)
+}