13111: Merge branch 'master' into 12308-go-fuse
[arvados.git] / services / crunch-run / crunchrun.go
index 705bea486b21797ea4c6523ba28e03dda4ad269c..a9f1c25d37fa1714868d371c245dc7aa8d041543 100644 (file)
@@ -103,16 +103,16 @@ type ContainerRunner struct {
        LogsPDH       *string
        RunArvMount
        MkTempDir
-       ArvMount       *exec.Cmd
-       ArvMountPoint  string
-       HostOutputDir  string
-       CleanupTempDir []string
-       Binds          []string
-       Volumes        map[string]struct{}
-       OutputPDH      *string
-       SigChan        chan os.Signal
-       ArvMountExit   chan error
-       finalState     string
+       ArvMount      *exec.Cmd
+       ArvMountPoint string
+       HostOutputDir string
+       Binds         []string
+       Volumes       map[string]struct{}
+       OutputPDH     *string
+       SigChan       chan os.Signal
+       ArvMountExit  chan error
+       finalState    string
+       parentTemp    string
 
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
@@ -327,11 +327,42 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
 
 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
        if runner.ArvMountPoint == "" {
-               runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+               runner.ArvMountPoint, err = runner.MkTempDir(runner.parentTemp, prefix)
        }
        return
 }
 
+func copyfile(src string, dst string) (err error) {
+       srcfile, err := os.Open(src)
+       if err != nil {
+               return
+       }
+
+       os.MkdirAll(path.Dir(dst), 0777)
+
+       dstfile, err := os.Create(dst)
+       if err != nil {
+               return
+       }
+       _, err = io.Copy(dstfile, srcfile)
+       if err != nil {
+               return
+       }
+
+       err = srcfile.Close()
+       err2 := dstfile.Close()
+
+       if err != nil {
+               return
+       }
+
+       if err2 != nil {
+               return err2
+       }
+
+       return nil
+}
+
 func (runner *ContainerRunner) SetupMounts() (err error) {
        err = runner.SetupArvMountPoint("keep")
        if err != nil {
@@ -359,6 +390,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        runner.Binds = nil
        runner.Volumes = make(map[string]struct{})
        needCertMount := true
+       type copyFile struct {
+               src  string
+               bind string
+       }
+       var copyFiles []copyFile
 
        var binds []string
        for bind := range runner.Container.Mounts {
@@ -414,7 +450,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                pdhOnly = false
                                src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
                        } else if mnt.PortableDataHash != "" {
-                               if mnt.Writable {
+                               if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
                                        return fmt.Errorf("Can never write to a collection specified by portable data hash")
                                }
                                idx := strings.Index(mnt.PortableDataHash, "/")
@@ -441,10 +477,12 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if mnt.Writable {
                                if bind == runner.Container.OutputPath {
                                        runner.HostOutputDir = src
+                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                                } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
-                                       return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
+                                       copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
+                               } else {
+                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                                }
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                        } else {
                                runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
                        }
@@ -452,7 +490,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
                case mnt.Kind == "tmp":
                        var tmpdir string
-                       tmpdir, err = runner.MkTempDir("", "")
+                       tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
                        if err != nil {
                                return fmt.Errorf("While creating mount temp dir: %v", err)
                        }
@@ -464,7 +502,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if staterr != nil {
                                return fmt.Errorf("While Chmod temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
                        if bind == runner.Container.OutputPath {
                                runner.HostOutputDir = tmpdir
@@ -480,11 +517,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        // can ensure the file is world-readable
                        // inside the container, without having to
                        // make it world-readable on the docker host.
-                       tmpdir, err := runner.MkTempDir("", "")
+                       tmpdir, err := runner.MkTempDir(runner.parentTemp, "json")
                        if err != nil {
                                return fmt.Errorf("creating temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
                        tmpfn := filepath.Join(tmpdir, "mountdata.json")
                        err = ioutil.WriteFile(tmpfn, jsondata, 0644)
                        if err != nil {
@@ -493,11 +529,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
 
                case mnt.Kind == "git_tree":
-                       tmpdir, err := runner.MkTempDir("", "")
+                       tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
                        if err != nil {
                                return fmt.Errorf("creating temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
                        err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
                        if err != nil {
                                return err
@@ -539,6 +574,44 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
        }
 
+       for _, cp := range copyFiles {
+               st, err := os.Stat(cp.src)
+               if err != nil {
+                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+               }
+               if st.IsDir() {
+                       err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+                               if walkerr != nil {
+                                       return walkerr
+                               }
+                               target := path.Join(cp.bind, walkpath[len(cp.src):])
+                               if walkinfo.Mode().IsRegular() {
+                                       copyerr := copyfile(walkpath, target)
+                                       if copyerr != nil {
+                                               return copyerr
+                                       }
+                                       return os.Chmod(target, walkinfo.Mode()|0777)
+                               } else if walkinfo.Mode().IsDir() {
+                                       mkerr := os.MkdirAll(target, 0777)
+                                       if mkerr != nil {
+                                               return mkerr
+                                       }
+                                       return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
+                               } else {
+                                       return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
+                               }
+                       })
+               } else if st.Mode().IsRegular() {
+                       err = copyfile(cp.src, cp.bind)
+                       if err == nil {
+                               err = os.Chmod(cp.bind, st.Mode()|0777)
+                       }
+               }
+               if err != nil {
+                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+               }
+       }
+
        return nil
 }
 
@@ -1102,7 +1175,7 @@ func (runner *ContainerRunner) UploadOutputFile(
        // go through mounts and try reverse map to collection reference
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+               if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable {
                        // get path relative to bind
                        targetSuffix := tgt[len(bind):]
 
@@ -1241,7 +1314,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
                        continue
                }
 
-               if mnt.ExcludeFromOutput == true {
+               if mnt.ExcludeFromOutput == true || mnt.Writable {
                        continue
                }
 
@@ -1363,10 +1436,8 @@ func (runner *ContainerRunner) CleanupDirs() {
                }
        }
 
-       for _, tmpdir := range runner.CleanupTempDir {
-               if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
-                       runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
-               }
+       if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
+               runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", runner.parentTemp, rmerr)
        }
 }
 
@@ -1705,6 +1776,12 @@ func main() {
                os.Exit(1)
        }
 
+       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
+       if tmperr != nil {
+               log.Fatalf("%s: %v", containerId, tmperr)
+       }
+
+       cr.parentTemp = parentTemp
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent