12447: Simplify BlockCache locking.
[arvados.git] / services / crunch-run / crunchrun.go
index 080a0fc71e1313ba2cfc1b40014c5ae999a24ab7..b1d36713648b42acef41506ed3840519a20cc586 100644 (file)
@@ -19,6 +19,8 @@ import (
        "os/signal"
        "path"
        "path/filepath"
+       "runtime"
+       "runtime/pprof"
        "sort"
        "strings"
        "sync"
@@ -54,6 +56,7 @@ var ErrCancelled = errors.New("Cancelled")
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
+       ClearBlockCache()
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -181,9 +184,9 @@ type ContainerRunner struct {
        networkMode   string // passed through to HostConfig.NetworkMode
 }
 
-// SetupSignals sets up signal handling to gracefully terminate the underlying
+// setupSignals sets up signal handling to gracefully terminate the underlying
 // Docker container and update state when receiving a TERM, INT or QUIT signal.
-func (runner *ContainerRunner) SetupSignals() {
+func (runner *ContainerRunner) setupSignals() {
        runner.SigChan = make(chan os.Signal, 1)
        signal.Notify(runner.SigChan, syscall.SIGTERM)
        signal.Notify(runner.SigChan, syscall.SIGINT)
@@ -192,7 +195,6 @@ func (runner *ContainerRunner) SetupSignals() {
        go func(sig chan os.Signal) {
                <-sig
                runner.stop()
-               signal.Stop(sig)
        }(runner.SigChan)
 }
 
@@ -213,6 +215,13 @@ func (runner *ContainerRunner) stop() {
        }
 }
 
+func (runner *ContainerRunner) teardown() {
+       if runner.SigChan != nil {
+               signal.Stop(runner.SigChan)
+               close(runner.SigChan)
+       }
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -258,6 +267,8 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
        runner.ContainerConfig.Image = imageID
 
+       runner.Kc.ClearBlockCache()
+
        return nil
 }
 
@@ -859,9 +870,9 @@ func (runner *ContainerRunner) StartContainer() error {
        if err != nil {
                var advice string
                if strings.Contains(err.Error(), "no such file or directory") {
-                       advice = fmt.Sprintf(" (perhaps command %q is missing, or has a missing #! interpreter, or was saved in DOS mode with cr-lf chars?)", runner.Container.Command[0])
+                       advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
                }
-               return fmt.Errorf("could not start container%s: %v", advice, err)
+               return fmt.Errorf("could not start container: %v%s", err, advice)
        }
        runner.cStarted = true
        return nil
@@ -1303,6 +1314,8 @@ func (runner *ContainerRunner) Run() (err error) {
                // a new one in case we needed to log anything while
                // finalizing.
                runner.CrunchLog.Close()
+
+               runner.teardown()
        }()
 
        err = runner.fetchContainerRecord()
@@ -1311,7 +1324,7 @@ func (runner *ContainerRunner) Run() (err error) {
        }
 
        // setup signal handling
-       runner.SetupSignals()
+       runner.setupSignals()
 
        // check for and/or load image
        err = runner.LoadImage()
@@ -1421,6 +1434,7 @@ func main() {
        networkMode := flag.String("container-network-mode", "default",
                `Set networking mode for container.  Corresponds to Docker network mode (--net).
        `)
+       memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
        flag.Parse()
 
        containerId := flag.Arg(0)
@@ -1440,6 +1454,7 @@ func main() {
        if err != nil {
                log.Fatalf("%s: %v", containerId, err)
        }
+       kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
        var docker *dockerclient.Client
@@ -1464,9 +1479,24 @@ func main() {
                cr.expectCgroupParent = p
        }
 
-       err = cr.Run()
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
+       runerr := cr.Run()
+
+       if *memprofile != "" {
+               f, err := os.Create(*memprofile)
+               if err != nil {
+                       log.Printf("could not create memory profile: ", err)
+               }
+               runtime.GC() // get up-to-date statistics
+               if err := pprof.WriteHeapProfile(f); err != nil {
+                       log.Printf("could not write memory profile: ", err)
+               }
+               closeerr := f.Close()
+               if closeerr != nil {
+                       log.Printf("closing memprofile file: ", err)
+               }
        }
 
+       if runerr != nil {
+               log.Fatalf("%s: %v", containerId, runerr)
+       }
 }