Merge branch '12840-cancelled-pipeline'
[arvados.git] / services / crunch-run / crunchrun.go
index a249273095c5909e49ed83d6148cdd54a2811386..653e0b4949da882cbcc185894ffdaee369c19318 100644 (file)
@@ -103,21 +103,23 @@ type ContainerRunner struct {
        LogsPDH       *string
        RunArvMount
        MkTempDir
-       ArvMount       *exec.Cmd
-       ArvMountPoint  string
-       HostOutputDir  string
-       CleanupTempDir []string
-       Binds          []string
-       Volumes        map[string]struct{}
-       OutputPDH      *string
-       SigChan        chan os.Signal
-       ArvMountExit   chan error
-       finalState     string
-
-       statLogger   io.WriteCloser
-       statReporter *crunchstat.Reporter
-       statInterval time.Duration
-       cgroupRoot   string
+       ArvMount      *exec.Cmd
+       ArvMountPoint string
+       HostOutputDir string
+       Binds         []string
+       Volumes       map[string]struct{}
+       OutputPDH     *string
+       SigChan       chan os.Signal
+       ArvMountExit  chan error
+       finalState    string
+       parentTemp    string
+
+       statLogger       io.WriteCloser
+       statReporter     *crunchstat.Reporter
+       hoststatLogger   io.WriteCloser
+       hoststatReporter *crunchstat.Reporter
+       statInterval     time.Duration
+       cgroupRoot       string
        // What we expect the container's cgroup parent to be.
        expectCgroupParent string
        // What we tell docker to use as the container's cgroup
@@ -133,7 +135,6 @@ type ContainerRunner struct {
        setCgroupParent string
 
        cStateLock sync.Mutex
-       cStarted   bool // StartContainer() succeeded
        cCancelled bool // StopContainer() invoked
 
        enableNetwork string // one of "default" or "always"
@@ -151,17 +152,19 @@ func (runner *ContainerRunner) setupSignals() {
 
        go func(sig chan os.Signal) {
                for s := range sig {
-                       runner.CrunchLog.Printf("caught signal: %v", s)
-                       runner.stop()
+                       runner.stop(s)
                }
        }(runner.SigChan)
 }
 
 // stop the underlying Docker container.
-func (runner *ContainerRunner) stop() {
+func (runner *ContainerRunner) stop(sig os.Signal) {
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
-       if !runner.cStarted {
+       if sig != nil {
+               runner.CrunchLog.Printf("caught signal: %v", sig)
+       }
+       if runner.ContainerID == "" {
                return
        }
        runner.cCancelled = true
@@ -172,12 +175,6 @@ func (runner *ContainerRunner) stop() {
        }
 }
 
-func (runner *ContainerRunner) stopSignals() {
-       if runner.SigChan != nil {
-               signal.Stop(runner.SigChan)
-       }
-}
-
 var errorBlacklist = []string{
        "(?ms).*[Cc]annot connect to the Docker daemon.*",
        "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
@@ -326,11 +323,42 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
 
 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
        if runner.ArvMountPoint == "" {
-               runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+               runner.ArvMountPoint, err = runner.MkTempDir(runner.parentTemp, prefix)
        }
        return
 }
 
+func copyfile(src string, dst string) (err error) {
+       srcfile, err := os.Open(src)
+       if err != nil {
+               return
+       }
+
+       os.MkdirAll(path.Dir(dst), 0777)
+
+       dstfile, err := os.Create(dst)
+       if err != nil {
+               return
+       }
+       _, err = io.Copy(dstfile, srcfile)
+       if err != nil {
+               return
+       }
+
+       err = srcfile.Close()
+       err2 := dstfile.Close()
+
+       if err != nil {
+               return
+       }
+
+       if err2 != nil {
+               return err2
+       }
+
+       return nil
+}
+
 func (runner *ContainerRunner) SetupMounts() (err error) {
        err = runner.SetupArvMountPoint("keep")
        if err != nil {
@@ -358,6 +386,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        runner.Binds = nil
        runner.Volumes = make(map[string]struct{})
        needCertMount := true
+       type copyFile struct {
+               src  string
+               bind string
+       }
+       var copyFiles []copyFile
 
        var binds []string
        for bind := range runner.Container.Mounts {
@@ -413,7 +446,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                pdhOnly = false
                                src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
                        } else if mnt.PortableDataHash != "" {
-                               if mnt.Writable {
+                               if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
                                        return fmt.Errorf("Can never write to a collection specified by portable data hash")
                                }
                                idx := strings.Index(mnt.PortableDataHash, "/")
@@ -440,10 +473,12 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if mnt.Writable {
                                if bind == runner.Container.OutputPath {
                                        runner.HostOutputDir = src
+                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                                } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
-                                       return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
+                                       copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
+                               } else {
+                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                                }
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                        } else {
                                runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
                        }
@@ -451,7 +486,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
                case mnt.Kind == "tmp":
                        var tmpdir string
-                       tmpdir, err = runner.MkTempDir("", "")
+                       tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
                        if err != nil {
                                return fmt.Errorf("While creating mount temp dir: %v", err)
                        }
@@ -463,7 +498,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if staterr != nil {
                                return fmt.Errorf("While Chmod temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
                        if bind == runner.Container.OutputPath {
                                runner.HostOutputDir = tmpdir
@@ -479,11 +513,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        // can ensure the file is world-readable
                        // inside the container, without having to
                        // make it world-readable on the docker host.
-                       tmpdir, err := runner.MkTempDir("", "")
+                       tmpdir, err := runner.MkTempDir(runner.parentTemp, "json")
                        if err != nil {
                                return fmt.Errorf("creating temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
                        tmpfn := filepath.Join(tmpdir, "mountdata.json")
                        err = ioutil.WriteFile(tmpfn, jsondata, 0644)
                        if err != nil {
@@ -492,11 +525,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
 
                case mnt.Kind == "git_tree":
-                       tmpdir, err := runner.MkTempDir("", "")
+                       tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
                        if err != nil {
                                return fmt.Errorf("creating temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
                        err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
                        if err != nil {
                                return err
@@ -538,59 +570,118 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
        }
 
+       for _, cp := range copyFiles {
+               st, err := os.Stat(cp.src)
+               if err != nil {
+                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+               }
+               if st.IsDir() {
+                       err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+                               if walkerr != nil {
+                                       return walkerr
+                               }
+                               target := path.Join(cp.bind, walkpath[len(cp.src):])
+                               if walkinfo.Mode().IsRegular() {
+                                       copyerr := copyfile(walkpath, target)
+                                       if copyerr != nil {
+                                               return copyerr
+                                       }
+                                       return os.Chmod(target, walkinfo.Mode()|0777)
+                               } else if walkinfo.Mode().IsDir() {
+                                       mkerr := os.MkdirAll(target, 0777)
+                                       if mkerr != nil {
+                                               return mkerr
+                                       }
+                                       return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
+                               } else {
+                                       return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
+                               }
+                       })
+               } else if st.Mode().IsRegular() {
+                       err = copyfile(cp.src, cp.bind)
+                       if err == nil {
+                               err = os.Chmod(cp.bind, st.Mode()|0777)
+                       }
+               }
+               if err != nil {
+                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+               }
+       }
+
        return nil
 }
 
 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
        // Handle docker log protocol
        // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
+       defer close(runner.loggingDone)
 
        header := make([]byte, 8)
-       for {
-               _, readerr := io.ReadAtLeast(containerReader, header, 8)
-
-               if readerr == nil {
-                       readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
-                       if header[0] == 1 {
-                               // stdout
-                               _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
-                       } else {
-                               // stderr
-                               _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
+       var err error
+       for err == nil {
+               _, err = io.ReadAtLeast(containerReader, header, 8)
+               if err != nil {
+                       if err == io.EOF {
+                               err = nil
                        }
+                       break
                }
+               readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
+               if header[0] == 1 {
+                       // stdout
+                       _, err = io.CopyN(runner.Stdout, containerReader, readsize)
+               } else {
+                       // stderr
+                       _, err = io.CopyN(runner.Stderr, containerReader, readsize)
+               }
+       }
 
-               if readerr != nil {
-                       if readerr != io.EOF {
-                               runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
-                       }
-
-                       closeerr := runner.Stdout.Close()
-                       if closeerr != nil {
-                               runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
-                       }
+       if err != nil {
+               runner.CrunchLog.Printf("error reading docker logs: %v", err)
+       }
 
-                       closeerr = runner.Stderr.Close()
-                       if closeerr != nil {
-                               runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
-                       }
+       err = runner.Stdout.Close()
+       if err != nil {
+               runner.CrunchLog.Printf("error closing stdout logs: %v", err)
+       }
 
-                       if runner.statReporter != nil {
-                               runner.statReporter.Stop()
-                               closeerr = runner.statLogger.Close()
-                               if closeerr != nil {
-                                       runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
-                               }
-                       }
+       err = runner.Stderr.Close()
+       if err != nil {
+               runner.CrunchLog.Printf("error closing stderr logs: %v", err)
+       }
 
-                       runner.loggingDone <- true
-                       close(runner.loggingDone)
-                       return
+       if runner.statReporter != nil {
+               runner.statReporter.Stop()
+               err = runner.statLogger.Close()
+               if err != nil {
+                       runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
 }
 
-func (runner *ContainerRunner) StartCrunchstat() {
+func (runner *ContainerRunner) stopHoststat() error {
+       if runner.hoststatReporter == nil {
+               return nil
+       }
+       runner.hoststatReporter.Stop()
+       err := runner.hoststatLogger.Close()
+       if err != nil {
+               return fmt.Errorf("error closing hoststat logs: %v", err)
+       }
+       return nil
+}
+
+func (runner *ContainerRunner) startHoststat() {
+       runner.hoststatLogger = NewThrottledLogger(runner.NewLogWriter("hoststat"))
+       runner.hoststatReporter = &crunchstat.Reporter{
+               Logger:     log.New(runner.hoststatLogger, "", 0),
+               CgroupRoot: runner.cgroupRoot,
+               PollPeriod: runner.statInterval,
+       }
+       runner.hoststatReporter.Start()
+}
+
+func (runner *ContainerRunner) startCrunchstat() {
        runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
        runner.statReporter = &crunchstat.Reporter{
                CID:          runner.ContainerID,
@@ -796,7 +887,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                        _, err := io.Copy(response.Conn, stdinRdr)
                        if err != nil {
                                runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
-                               runner.stop()
+                               runner.stop(nil)
                        }
                        stdinRdr.Close()
                        response.CloseWrite()
@@ -806,7 +897,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                        _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
                        if err != nil {
                                runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
-                               runner.stop()
+                               runner.stop(nil)
                        }
                        response.CloseWrite()
                }()
@@ -857,6 +948,7 @@ func (runner *ContainerRunner) CreateContainer() error {
 
        runner.ContainerConfig.Volumes = runner.Volumes
 
+       maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
        runner.HostConfig = dockercontainer.HostConfig{
                Binds: runner.Binds,
                LogConfig: dockercontainer.LogConfig{
@@ -864,6 +956,10 @@ func (runner *ContainerRunner) CreateContainer() error {
                },
                Resources: dockercontainer.Resources{
                        CgroupParent: runner.setCgroupParent,
+                       NanoCPUs:     int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000,
+                       Memory:       maxRAM, // RAM
+                       MemorySwap:   maxRAM, // RAM+swap
+                       KernelMemory: maxRAM, // kernel portion
                },
        }
 
@@ -920,46 +1016,38 @@ func (runner *ContainerRunner) StartContainer() error {
                }
                return fmt.Errorf("could not start container: %v%s", err, advice)
        }
-       runner.cStarted = true
        return nil
 }
 
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() (err error) {
+func (runner *ContainerRunner) WaitFinish() error {
        runner.CrunchLog.Print("Waiting for container to finish")
 
        waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+       arvMountExit := runner.ArvMountExit
+       for {
+               select {
+               case waitBody := <-waitOk:
+                       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+                       code := int(waitBody.StatusCode)
+                       runner.ExitCode = &code
+
+                       // wait for stdout/stderr to complete
+                       <-runner.loggingDone
+                       return nil
 
-       go func() {
-               <-runner.ArvMountExit
-               if runner.cStarted {
+               case err := <-waitErr:
+                       return fmt.Errorf("container wait: %v", err)
+
+               case <-arvMountExit:
                        runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
-                       runner.stop()
+                       runner.stop(nil)
+                       // arvMountExit will always be ready now that
+                       // it's closed, but that doesn't interest us.
+                       arvMountExit = nil
                }
-       }()
-
-       var waitBody dockercontainer.ContainerWaitOKBody
-       select {
-       case waitBody = <-waitOk:
-       case err = <-waitErr:
-       }
-
-       // Container isn't running any more
-       runner.cStarted = false
-
-       if err != nil {
-               return fmt.Errorf("container wait: %v", err)
        }
-
-       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
-       code := int(waitBody.StatusCode)
-       runner.ExitCode = &code
-
-       // wait for stdout/stderr to complete
-       <-runner.loggingDone
-
-       return nil
 }
 
 var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
@@ -1088,7 +1176,7 @@ func (runner *ContainerRunner) UploadOutputFile(
        // go through mounts and try reverse map to collection reference
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+               if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable {
                        // get path relative to bind
                        targetSuffix := tgt[len(bind):]
 
@@ -1227,7 +1315,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
                        continue
                }
 
-               if mnt.ExcludeFromOutput == true {
+               if mnt.ExcludeFromOutput == true || mnt.Writable {
                        continue
                }
 
@@ -1349,29 +1437,33 @@ func (runner *ContainerRunner) CleanupDirs() {
                }
        }
 
-       for _, tmpdir := range runner.CleanupTempDir {
-               if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
-                       runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
-               }
+       if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
+               runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", runner.parentTemp, rmerr)
        }
 }
 
 // CommitLogs posts the collection containing the final container logs.
 func (runner *ContainerRunner) CommitLogs() error {
-       runner.CrunchLog.Print(runner.finalState)
+       func() {
+               // Hold cStateLock to prevent races on CrunchLog (e.g., stop()).
+               runner.cStateLock.Lock()
+               defer runner.cStateLock.Unlock()
 
-       if runner.arvMountLog != nil {
-               runner.arvMountLog.Close()
-       }
-       runner.CrunchLog.Close()
+               runner.CrunchLog.Print(runner.finalState)
+
+               if runner.arvMountLog != nil {
+                       runner.arvMountLog.Close()
+               }
+               runner.CrunchLog.Close()
 
-       // Closing CrunchLog above allows them to be committed to Keep at this
-       // point, but re-open crunch log with ArvClient in case there are any
-       // other further errors (such as failing to write the log to Keep!)
-       // while shutting down
-       runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
-               UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
-       runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+               // Closing CrunchLog above allows them to be committed to Keep at this
+               // point, but re-open crunch log with ArvClient in case there are any
+               // other further errors (such as failing to write the log to Keep!)
+               // while shutting down
+               runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+                       UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+               runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+       }()
 
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
@@ -1480,7 +1572,6 @@ func (runner *ContainerRunner) Run() (err error) {
        runner.finalState = "Queued"
 
        defer func() {
-               runner.stopSignals()
                runner.CleanupDirs()
 
                runner.CrunchLog.Printf("crunch-run finished")
@@ -1522,6 +1613,7 @@ func (runner *ContainerRunner) Run() (err error) {
                }
 
                checkErr(runner.CaptureOutput())
+               checkErr(runner.stopHoststat())
                checkErr(runner.CommitLogs())
                checkErr(runner.UpdateContainerFinal())
        }()
@@ -1530,9 +1622,8 @@ func (runner *ContainerRunner) Run() (err error) {
        if err != nil {
                return
        }
-
-       // setup signal handling
        runner.setupSignals()
+       runner.startHoststat()
 
        // check for and/or load image
        err = runner.LoadImage()
@@ -1581,7 +1672,7 @@ func (runner *ContainerRunner) Run() (err error) {
        }
        runner.finalState = "Cancelled"
 
-       runner.StartCrunchstat()
+       runner.startCrunchstat()
 
        err = runner.StartContainer()
        if err != nil {
@@ -1691,6 +1782,12 @@ func main() {
                os.Exit(1)
        }
 
+       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
+       if tmperr != nil {
+               log.Fatalf("%s: %v", containerId, tmperr)
+       }
+
+       cr.parentTemp = parentTemp
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent