12447: Simplify BlockCache locking.
[arvados.git] / services / crunch-run / crunchrun.go
index 4a91401573f444598dc80582dabea0b3c9ba7231..b1d36713648b42acef41506ed3840519a20cc586 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -15,6 +19,8 @@ import (
        "os/signal"
        "path"
        "path/filepath"
+       "runtime"
+       "runtime/pprof"
        "sort"
        "strings"
        "sync"
@@ -50,6 +56,7 @@ var ErrCancelled = errors.New("Cancelled")
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
+       ClearBlockCache()
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -177,9 +184,9 @@ type ContainerRunner struct {
        networkMode   string // passed through to HostConfig.NetworkMode
 }
 
-// SetupSignals sets up signal handling to gracefully terminate the underlying
+// setupSignals sets up signal handling to gracefully terminate the underlying
 // Docker container and update state when receiving a TERM, INT or QUIT signal.
-func (runner *ContainerRunner) SetupSignals() {
+func (runner *ContainerRunner) setupSignals() {
        runner.SigChan = make(chan os.Signal, 1)
        signal.Notify(runner.SigChan, syscall.SIGTERM)
        signal.Notify(runner.SigChan, syscall.SIGINT)
@@ -188,7 +195,6 @@ func (runner *ContainerRunner) SetupSignals() {
        go func(sig chan os.Signal) {
                <-sig
                runner.stop()
-               signal.Stop(sig)
        }(runner.SigChan)
 }
 
@@ -209,6 +215,13 @@ func (runner *ContainerRunner) stop() {
        }
 }
 
+func (runner *ContainerRunner) teardown() {
+       if runner.SigChan != nil {
+               signal.Stop(runner.SigChan)
+               close(runner.SigChan)
+       }
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -254,6 +267,8 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
        runner.ContainerConfig.Image = imageID
 
+       runner.Kc.ClearBlockCache()
+
        return nil
 }
 
@@ -341,7 +356,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        needCertMount := true
 
        var binds []string
-       for bind, _ := range runner.Container.Mounts {
+       for bind := range runner.Container.Mounts {
                binds = append(binds, bind)
        }
        sort.Strings(binds)
@@ -588,23 +603,23 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
        logger := log.New(w, "node-info", 0)
 
        commands := []infoCommand{
-               infoCommand{
+               {
                        label: "Host Information",
                        cmd:   []string{"uname", "-a"},
                },
-               infoCommand{
+               {
                        label: "CPU Information",
                        cmd:   []string{"cat", "/proc/cpuinfo"},
                },
-               infoCommand{
+               {
                        label: "Memory Information",
                        cmd:   []string{"cat", "/proc/meminfo"},
                },
-               infoCommand{
+               {
                        label: "Disk Space",
                        cmd:   []string{"df", "-m", "/", os.TempDir()},
                },
-               infoCommand{
+               {
                        label: "Disk INodes",
                        cmd:   []string{"df", "-i", "/", os.TempDir()},
                },
@@ -646,14 +661,11 @@ func (runner *ContainerRunner) LogContainerRecord() (err error) {
                return fmt.Errorf("While retrieving container record from the API server: %v", err)
        }
        defer reader.Close()
-       // Read the API server response as []byte
-       json_bytes, err := ioutil.ReadAll(reader)
-       if err != nil {
-               return fmt.Errorf("While reading container record API server response: %v", err)
-       }
-       // Decode the JSON []byte
+
+       dec := json.NewDecoder(reader)
+       dec.UseNumber()
        var cr map[string]interface{}
-       if err = json.Unmarshal(json_bytes, &cr); err != nil {
+       if err = dec.Decode(&cr); err != nil {
                return fmt.Errorf("While decoding the container record JSON response: %v", err)
        }
        // Re-encode it using indentation to improve readability
@@ -856,7 +868,11 @@ func (runner *ContainerRunner) StartContainer() error {
        err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
                dockertypes.ContainerStartOptions{})
        if err != nil {
-               return fmt.Errorf("could not start container: %v", err)
+               var advice string
+               if strings.Contains(err.Error(), "no such file or directory") {
+                       advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
+               }
+               return fmt.Errorf("could not start container: %v%s", err, advice)
        }
        runner.cStarted = true
        return nil
@@ -1298,16 +1314,17 @@ func (runner *ContainerRunner) Run() (err error) {
                // a new one in case we needed to log anything while
                // finalizing.
                runner.CrunchLog.Close()
+
+               runner.teardown()
        }()
 
-       err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
+       err = runner.fetchContainerRecord()
        if err != nil {
-               err = fmt.Errorf("While getting container record: %v", err)
                return
        }
 
        // setup signal handling
-       runner.SetupSignals()
+       runner.setupSignals()
 
        // check for and/or load image
        err = runner.LoadImage()
@@ -1365,6 +1382,24 @@ func (runner *ContainerRunner) Run() (err error) {
        return
 }
 
+// Fetch the current container record (uuid = runner.Container.UUID)
+// into runner.Container.
+func (runner *ContainerRunner) fetchContainerRecord() error {
+       reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+       if err != nil {
+               return fmt.Errorf("error fetching container record: %v", err)
+       }
+       defer reader.Close()
+
+       dec := json.NewDecoder(reader)
+       dec.UseNumber()
+       err = dec.Decode(&runner.Container)
+       if err != nil {
+               return fmt.Errorf("error decoding container record: %v", err)
+       }
+       return nil
+}
+
 // NewContainerRunner creates a new container runner.
 func NewContainerRunner(api IArvadosClient,
        kc IKeepClient,
@@ -1399,6 +1434,7 @@ func main() {
        networkMode := flag.String("container-network-mode", "default",
                `Set networking mode for container.  Corresponds to Docker network mode (--net).
        `)
+       memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
        flag.Parse()
 
        containerId := flag.Arg(0)
@@ -1418,6 +1454,7 @@ func main() {
        if err != nil {
                log.Fatalf("%s: %v", containerId, err)
        }
+       kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
        var docker *dockerclient.Client
@@ -1442,9 +1479,24 @@ func main() {
                cr.expectCgroupParent = p
        }
 
-       err = cr.Run()
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
+       runerr := cr.Run()
+
+       if *memprofile != "" {
+               f, err := os.Create(*memprofile)
+               if err != nil {
+                       log.Printf("could not create memory profile: ", err)
+               }
+               runtime.GC() // get up-to-date statistics
+               if err := pprof.WriteHeapProfile(f); err != nil {
+                       log.Printf("could not write memory profile: ", err)
+               }
+               closeerr := f.Close()
+               if closeerr != nil {
+                       log.Printf("closing memprofile file: ", err)
+               }
        }
 
+       if runerr != nil {
+               log.Fatalf("%s: %v", containerId, runerr)
+       }
 }