X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d43dce642a9681a33a5259f5bde05c8d3f3b690e..d0414ca727006b821b10b25d3920dc0f66400356:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 4a91401573..b1d3671364 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -15,6 +19,8 @@ import ( "os/signal" "path" "path/filepath" + "runtime" + "runtime/pprof" "sort" "strings" "sync" @@ -50,6 +56,7 @@ var ErrCancelled = errors.New("Cancelled") type IKeepClient interface { PutHB(hash string, buf []byte) (string, int, error) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) + ClearBlockCache() } // NewLogWriter is a factory function to create a new log writer. @@ -177,9 +184,9 @@ type ContainerRunner struct { networkMode string // passed through to HostConfig.NetworkMode } -// SetupSignals sets up signal handling to gracefully terminate the underlying +// setupSignals sets up signal handling to gracefully terminate the underlying // Docker container and update state when receiving a TERM, INT or QUIT signal. -func (runner *ContainerRunner) SetupSignals() { +func (runner *ContainerRunner) setupSignals() { runner.SigChan = make(chan os.Signal, 1) signal.Notify(runner.SigChan, syscall.SIGTERM) signal.Notify(runner.SigChan, syscall.SIGINT) @@ -188,7 +195,6 @@ func (runner *ContainerRunner) SetupSignals() { go func(sig chan os.Signal) { <-sig runner.stop() - signal.Stop(sig) }(runner.SigChan) } @@ -209,6 +215,13 @@ func (runner *ContainerRunner) stop() { } } +func (runner *ContainerRunner) teardown() { + if runner.SigChan != nil { + signal.Stop(runner.SigChan) + close(runner.SigChan) + } +} + // LoadImage determines the docker image id from the container record and // checks if it is available in the local Docker image store. If not, it loads // the image from Keep. @@ -254,6 +267,8 @@ func (runner *ContainerRunner) LoadImage() (err error) { runner.ContainerConfig.Image = imageID + runner.Kc.ClearBlockCache() + return nil } @@ -341,7 +356,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { needCertMount := true var binds []string - for bind, _ := range runner.Container.Mounts { + for bind := range runner.Container.Mounts { binds = append(binds, bind) } sort.Strings(binds) @@ -588,23 +603,23 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) { logger := log.New(w, "node-info", 0) commands := []infoCommand{ - infoCommand{ + { label: "Host Information", cmd: []string{"uname", "-a"}, }, - infoCommand{ + { label: "CPU Information", cmd: []string{"cat", "/proc/cpuinfo"}, }, - infoCommand{ + { label: "Memory Information", cmd: []string{"cat", "/proc/meminfo"}, }, - infoCommand{ + { label: "Disk Space", cmd: []string{"df", "-m", "/", os.TempDir()}, }, - infoCommand{ + { label: "Disk INodes", cmd: []string{"df", "-i", "/", os.TempDir()}, }, @@ -646,14 +661,11 @@ func (runner *ContainerRunner) LogContainerRecord() (err error) { return fmt.Errorf("While retrieving container record from the API server: %v", err) } defer reader.Close() - // Read the API server response as []byte - json_bytes, err := ioutil.ReadAll(reader) - if err != nil { - return fmt.Errorf("While reading container record API server response: %v", err) - } - // Decode the JSON []byte + + dec := json.NewDecoder(reader) + dec.UseNumber() var cr map[string]interface{} - if err = json.Unmarshal(json_bytes, &cr); err != nil { + if err = dec.Decode(&cr); err != nil { return fmt.Errorf("While decoding the container record JSON response: %v", err) } // Re-encode it using indentation to improve readability @@ -856,7 +868,11 @@ func (runner *ContainerRunner) StartContainer() error { err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID, dockertypes.ContainerStartOptions{}) if err != nil { - return fmt.Errorf("could not start container: %v", err) + var advice string + if strings.Contains(err.Error(), "no such file or directory") { + advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0]) + } + return fmt.Errorf("could not start container: %v%s", err, advice) } runner.cStarted = true return nil @@ -1298,16 +1314,17 @@ func (runner *ContainerRunner) Run() (err error) { // a new one in case we needed to log anything while // finalizing. runner.CrunchLog.Close() + + runner.teardown() }() - err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container) + err = runner.fetchContainerRecord() if err != nil { - err = fmt.Errorf("While getting container record: %v", err) return } // setup signal handling - runner.SetupSignals() + runner.setupSignals() // check for and/or load image err = runner.LoadImage() @@ -1365,6 +1382,24 @@ func (runner *ContainerRunner) Run() (err error) { return } +// Fetch the current container record (uuid = runner.Container.UUID) +// into runner.Container. +func (runner *ContainerRunner) fetchContainerRecord() error { + reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil) + if err != nil { + return fmt.Errorf("error fetching container record: %v", err) + } + defer reader.Close() + + dec := json.NewDecoder(reader) + dec.UseNumber() + err = dec.Decode(&runner.Container) + if err != nil { + return fmt.Errorf("error decoding container record: %v", err) + } + return nil +} + // NewContainerRunner creates a new container runner. func NewContainerRunner(api IArvadosClient, kc IKeepClient, @@ -1399,6 +1434,7 @@ func main() { networkMode := flag.String("container-network-mode", "default", `Set networking mode for container. Corresponds to Docker network mode (--net). `) + memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container") flag.Parse() containerId := flag.Arg(0) @@ -1418,6 +1454,7 @@ func main() { if err != nil { log.Fatalf("%s: %v", containerId, err) } + kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} kc.Retries = 4 var docker *dockerclient.Client @@ -1442,9 +1479,24 @@ func main() { cr.expectCgroupParent = p } - err = cr.Run() - if err != nil { - log.Fatalf("%s: %v", containerId, err) + runerr := cr.Run() + + if *memprofile != "" { + f, err := os.Create(*memprofile) + if err != nil { + log.Printf("could not create memory profile: ", err) + } + runtime.GC() // get up-to-date statistics + if err := pprof.WriteHeapProfile(f); err != nil { + log.Printf("could not write memory profile: ", err) + } + closeerr := f.Close() + if closeerr != nil { + log.Printf("closing memprofile file: ", err) + } } + if runerr != nil { + log.Fatalf("%s: %v", containerId, runerr) + } }