X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c64d7e1d5d1879187e1c4002445fab1d3c7951a0..92656b214ed120c631bc6adab3b35992939e2ced:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index c6847bc19d..a957874f91 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -15,6 +19,8 @@ import ( "os/signal" "path" "path/filepath" + "runtime" + "runtime/pprof" "sort" "strings" "sync" @@ -33,6 +39,8 @@ import ( dockerclient "github.com/docker/docker/client" ) +var version = "dev" + // IArvadosClient is the minimal Arvados API methods used by crunch-run. type IArvadosClient interface { Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error @@ -49,7 +57,8 @@ var ErrCancelled = errors.New("Cancelled") // IKeepClient is the minimal Keep API methods used by crunch-run. type IKeepClient interface { PutHB(hash string, buf []byte) (string, int, error) - ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) + ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) + ClearBlockCache() } // NewLogWriter is a factory function to create a new log writer. @@ -66,7 +75,7 @@ type ThinDockerClient interface { networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error ContainerStop(ctx context.Context, container string, timeout *time.Duration) error - ContainerWait(ctx context.Context, container string) (int64, error) + ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) @@ -100,8 +109,8 @@ func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container } // ContainerWait invokes dockerclient.Client.ContainerWait -func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) { - return proxy.Docker.ContainerWait(ctx, container) +func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) { + return proxy.Docker.ContainerWait(ctx, container, condition) } // ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw @@ -175,20 +184,23 @@ type ContainerRunner struct { enableNetwork string // one of "default" or "always" networkMode string // passed through to HostConfig.NetworkMode + arvMountLog *ThrottledLogger } -// SetupSignals sets up signal handling to gracefully terminate the underlying +// setupSignals sets up signal handling to gracefully terminate the underlying // Docker container and update state when receiving a TERM, INT or QUIT signal. -func (runner *ContainerRunner) SetupSignals() { +func (runner *ContainerRunner) setupSignals() { runner.SigChan = make(chan os.Signal, 1) signal.Notify(runner.SigChan, syscall.SIGTERM) signal.Notify(runner.SigChan, syscall.SIGINT) signal.Notify(runner.SigChan, syscall.SIGQUIT) go func(sig chan os.Signal) { - <-sig + s := <-sig + if s != nil { + runner.CrunchLog.Printf("Caught signal %v", s) + } runner.stop() - signal.Stop(sig) }(runner.SigChan) } @@ -204,8 +216,17 @@ func (runner *ContainerRunner) stop() { timeout := time.Duration(10) err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout)) if err != nil { - log.Printf("StopContainer failed: %s", err) + runner.CrunchLog.Printf("StopContainer failed: %s", err) } + // Suppress multiple calls to stop() + runner.cStarted = false + } +} + +func (runner *ContainerRunner) stopSignals() { + if runner.SigChan != nil { + signal.Stop(runner.SigChan) + close(runner.SigChan) } } @@ -243,17 +264,25 @@ func (runner *ContainerRunner) LoadImage() (err error) { return fmt.Errorf("While creating ManifestFileReader for container image: %v", err) } - response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, false) + response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true) if err != nil { return fmt.Errorf("While loading container image into Docker: %v", err) } - response.Body.Close() + + defer response.Body.Close() + rbody, err := ioutil.ReadAll(response.Body) + if err != nil { + return fmt.Errorf("Reading response to image load: %v", err) + } + runner.CrunchLog.Printf("Docker response: %s", rbody) } else { runner.CrunchLog.Print("Docker image is available") } runner.ContainerConfig.Image = imageID + runner.Kc.ClearBlockCache() + return nil } @@ -270,9 +299,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( } c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token) - nt := NewThrottledLogger(runner.NewLogWriter("arv-mount")) - c.Stdout = nt - c.Stderr = nt + runner.arvMountLog = NewThrottledLogger(runner.NewLogWriter("arv-mount")) + c.Stdout = runner.arvMountLog + c.Stderr = runner.arvMountLog + + runner.CrunchLog.Printf("Running %v", c.Args) err = c.Start() if err != nil { @@ -296,7 +327,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( }() go func() { - runner.ArvMountExit <- c.Wait() + mnterr := c.Wait() + if mnterr != nil { + runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr) + } + runner.ArvMountExit <- mnterr close(runner.ArvMountExit) }() @@ -325,11 +360,13 @@ func (runner *ContainerRunner) SetupMounts() (err error) { return fmt.Errorf("While creating keep mount temp dir: %v", err) } - runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint) - pdhOnly := true tmpcount := 0 - arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"} + arvMountCmd := []string{ + "--foreground", + "--allow-other", + "--read-write", + fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())} if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) @@ -341,7 +378,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { needCertMount := true var binds []string - for bind, _ := range runner.Container.Mounts { + for bind := range runner.Container.Mounts { binds = append(binds, bind) } sort.Strings(binds) @@ -581,30 +618,30 @@ type infoCommand struct { cmd []string } -// Gather node information and store it on the log for debugging +// LogNodeInfo gathers node information and store it on the log for debugging // purposes. func (runner *ContainerRunner) LogNodeInfo() (err error) { w := runner.NewLogWriter("node-info") logger := log.New(w, "node-info", 0) commands := []infoCommand{ - infoCommand{ + { label: "Host Information", cmd: []string{"uname", "-a"}, }, - infoCommand{ + { label: "CPU Information", cmd: []string{"cat", "/proc/cpuinfo"}, }, - infoCommand{ + { label: "Memory Information", cmd: []string{"cat", "/proc/meminfo"}, }, - infoCommand{ + { label: "Disk Space", cmd: []string{"df", "-m", "/", os.TempDir()}, }, - infoCommand{ + { label: "Disk INodes", cmd: []string{"df", "-i", "/", os.TempDir()}, }, @@ -631,28 +668,26 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) { return nil } -// Get and save the raw JSON container record from the API server +// LogContainerRecord gets and saves the raw JSON container record from the API server func (runner *ContainerRunner) LogContainerRecord() (err error) { w := &ArvLogWriter{ - runner.ArvClient, - runner.Container.UUID, - "container", - runner.LogCollection.Open("container.json"), + ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, + loggingStream: "container", + writeCloser: runner.LogCollection.Open("container.json"), } + // Get Container record JSON from the API Server reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil) if err != nil { return fmt.Errorf("While retrieving container record from the API server: %v", err) } defer reader.Close() - // Read the API server response as []byte - json_bytes, err := ioutil.ReadAll(reader) - if err != nil { - return fmt.Errorf("While reading container record API server response: %v", err) - } - // Decode the JSON []byte + + dec := json.NewDecoder(reader) + dec.UseNumber() var cr map[string]interface{} - if err = json.Unmarshal(json_bytes, &cr); err != nil { + if err = dec.Decode(&cr); err != nil { return fmt.Errorf("While decoding the container record JSON response: %v", err) } // Re-encode it using indentation to improve readability @@ -675,7 +710,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.CrunchLog.Print("Attaching container streams") // If stdin mount is provided, attach it to the docker container - var stdinRdr keepclient.Reader + var stdinRdr arvados.File var stdinJson []byte if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok { if stdinMnt.Kind == "collection" { @@ -768,14 +803,14 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { if err != nil { return nil, fmt.Errorf("While Stat on temp dir: %v", err) } - stdoutPath := path.Join(runner.HostOutputDir, subdirs) + stdoutPath := filepath.Join(runner.HostOutputDir, subdirs) err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777) if err != nil { return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err) } } } - stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath)) + stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath)) if err != nil { return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err) } @@ -799,11 +834,13 @@ func (runner *ContainerRunner) CreateContainer() error { runner.ContainerConfig.Volumes = runner.Volumes runner.HostConfig = dockercontainer.HostConfig{ - Binds: runner.Binds, - Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent), + Binds: runner.Binds, LogConfig: dockercontainer.LogConfig{ Type: "none", }, + Resources: dockercontainer.Resources{ + CgroupParent: runner.setCgroupParent, + }, } if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { @@ -853,7 +890,11 @@ func (runner *ContainerRunner) StartContainer() error { err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID, dockertypes.ContainerStartOptions{}) if err != nil { - return fmt.Errorf("could not start container: %v", err) + var advice string + if strings.Contains(err.Error(), "no such file or directory") { + advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0]) + } + return fmt.Errorf("could not start container: %v%s", err, advice) } runner.cStarted = true return nil @@ -861,33 +902,189 @@ func (runner *ContainerRunner) StartContainer() error { // WaitFinish waits for the container to terminate, capture the exit code, and // close the stdout/stderr logging. -func (runner *ContainerRunner) WaitFinish() error { +func (runner *ContainerRunner) WaitFinish() (err error) { runner.CrunchLog.Print("Waiting for container to finish") - waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID) + waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running") + + go func() { + <-runner.ArvMountExit + if runner.cStarted { + runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.") + runner.stop() + } + }() + + var waitBody dockercontainer.ContainerWaitOKBody + select { + case waitBody = <-waitOk: + case err = <-waitErr: + } + + // Container isn't running any more + runner.cStarted = false + if err != nil { return fmt.Errorf("container wait: %v", err) } - runner.CrunchLog.Printf("Container exited with code: %v", waitDocker) - code := int(waitDocker) + runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode) + code := int(waitBody.StatusCode) runner.ExitCode = &code - waitMount := runner.ArvMountExit - select { - case err := <-waitMount: - runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err) - waitMount = nil - runner.stop() - default: - } - // wait for stdout/stderr to complete <-runner.loggingDone return nil } +var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory") + +func (runner *ContainerRunner) derefOutputSymlink(path string, startinfo os.FileInfo) (tgt string, readlinktgt string, info os.FileInfo, err error) { + // Follow symlinks if necessary + info = startinfo + tgt = path + readlinktgt = "" + nextlink := path + for followed := 0; info.Mode()&os.ModeSymlink != 0; followed++ { + if followed >= limitFollowSymlinks { + // Got stuck in a loop or just a pathological number of links, give up. + err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path) + return + } + + readlinktgt, err = os.Readlink(nextlink) + if err != nil { + return + } + + tgt = readlinktgt + if !strings.HasPrefix(tgt, "/") { + // Relative symlink, resolve it to host path + tgt = filepath.Join(filepath.Dir(path), tgt) + } + if strings.HasPrefix(tgt, runner.Container.OutputPath+"/") && !strings.HasPrefix(tgt, runner.HostOutputDir+"/") { + // Absolute symlink to container output path, adjust it to host output path. + tgt = filepath.Join(runner.HostOutputDir, tgt[len(runner.Container.OutputPath):]) + } + if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") { + // After dereferencing, symlink target must either be + // within output directory, or must point to a + // collection mount. + err = ErrNotInOutputDir + return + } + + info, err = os.Lstat(tgt) + if err != nil { + // tgt + err = fmt.Errorf("Symlink in output %q points to invalid location %q: %v", + path[len(runner.HostOutputDir):], readlinktgt, err) + return + } + + nextlink = tgt + } + + return +} + +var limitFollowSymlinks = 10 + +// UploadFile uploads files within the output directory, with special handling +// for symlinks. If the symlink leads to a keep mount, copy the manifest text +// from the keep mount into the output manifestText. Ensure that whether +// symlinks are relative or absolute, every symlink target (even targets that +// are symlinks themselves) must point to a path in either the output directory +// or a collection mount. +// +// Assumes initial value of "path" is absolute, and located within runner.HostOutputDir. +func (runner *ContainerRunner) UploadOutputFile( + path string, + info os.FileInfo, + infoerr error, + binds []string, + walkUpload *WalkUpload, + relocateFrom string, + relocateTo string, + followed int) (manifestText string, err error) { + + if info.Mode().IsDir() { + return + } + + if infoerr != nil { + return "", infoerr + } + + if followed >= limitFollowSymlinks { + // Got stuck in a loop or just a pathological number of + // directory links, give up. + err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path) + return + } + + // When following symlinks, the source path may need to be logically + // relocated to some other path within the output collection. Remove + // the relocateFrom prefix and replace it with relocateTo. + relocated := relocateTo + path[len(relocateFrom):] + + tgt, readlinktgt, info, derefErr := runner.derefOutputSymlink(path, info) + if derefErr != nil && derefErr != ErrNotInOutputDir { + return "", derefErr + } + + // go through mounts and try reverse map to collection reference + for _, bind := range binds { + mnt := runner.Container.Mounts[bind] + if tgt == bind || strings.HasPrefix(tgt, bind+"/") { + // get path relative to bind + targetSuffix := tgt[len(bind):] + + // Copy mount and adjust the path to add path relative to the bind + adjustedMount := mnt + adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix) + + // Terminates in this keep mount, so add the + // manifest text at appropriate location. + outputSuffix := path[len(runner.HostOutputDir):] + manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix) + return + } + } + + // If target is not a collection mount, it must be located within the + // output directory, otherwise it is an error. + if derefErr == ErrNotInOutputDir { + err = fmt.Errorf("Symlink in output %q points to invalid location %q, must point to path within the output directory.", + path[len(runner.HostOutputDir):], readlinktgt) + return + } + + if info.Mode().IsRegular() { + return "", walkUpload.UploadFile(relocated, tgt) + } + + if info.Mode().IsDir() { + // Symlink leads to directory. Walk() doesn't follow + // directory symlinks, so we walk the target directory + // instead. Within the walk, file paths are relocated + // so they appear under the original symlink path. + err = filepath.Walk(tgt, func(walkpath string, walkinfo os.FileInfo, walkerr error) error { + var m string + m, walkerr = runner.UploadOutputFile(walkpath, walkinfo, walkerr, + binds, walkUpload, tgt, relocated, followed+1) + if walkerr == nil { + manifestText = manifestText + m + } + return walkerr + }) + return + } + + return +} + // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories func (runner *ContainerRunner) CaptureOutput() error { if runner.finalState != "Complete" { @@ -918,14 +1115,42 @@ func (runner *ContainerRunner) CaptureOutput() error { return fmt.Errorf("While checking host output path: %v", err) } + // Pre-populate output from the configured mount points + var binds []string + for bind, mnt := range runner.Container.Mounts { + if mnt.Kind == "collection" { + binds = append(binds, bind) + } + } + sort.Strings(binds) + var manifestText string collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir) _, err = os.Stat(collectionMetafile) if err != nil { // Regular directory + cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}} - manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger) + walkUpload := cw.BeginUpload(runner.HostOutputDir, runner.CrunchLog.Logger) + + var m string + err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error { + m, err = runner.UploadOutputFile(path, info, err, binds, walkUpload, "", "", 0) + if err == nil { + manifestText = manifestText + m + } + return err + }) + + cw.EndUpload(walkUpload) + + if err != nil { + return fmt.Errorf("While uploading output files: %v", err) + } + + m, err = cw.ManifestText() + manifestText = manifestText + m if err != nil { return fmt.Errorf("While uploading output files: %v", err) } @@ -945,13 +1170,6 @@ func (runner *ContainerRunner) CaptureOutput() error { manifestText = rec.ManifestText } - // Pre-populate output from the configured mount points - var binds []string - for bind, _ := range runner.Container.Mounts { - binds = append(binds, bind) - } - sort.Strings(binds) - for _, bind := range binds { mnt := runner.Container.Mounts[bind] @@ -1037,21 +1255,55 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b func (runner *ContainerRunner) CleanupDirs() { if runner.ArvMount != nil { - umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint) - umnterr := umount.Run() + var delay int64 = 8 + umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint) + umount.Stdout = runner.CrunchLog + umount.Stderr = runner.CrunchLog + runner.CrunchLog.Printf("Running %v", umount.Args) + umnterr := umount.Start() + if umnterr != nil { - runner.CrunchLog.Printf("While running fusermount: %v", umnterr) + runner.CrunchLog.Printf("Error unmounting: %v", umnterr) + } else { + // If arv-mount --unmount gets stuck for any reason, we + // don't want to wait for it forever. Do Wait() in a goroutine + // so it doesn't block crunch-run. + umountExit := make(chan error) + go func() { + mnterr := umount.Wait() + if mnterr != nil { + runner.CrunchLog.Printf("Error unmounting: %v", mnterr) + } + umountExit <- mnterr + }() + + for again := true; again; { + again = false + select { + case <-umountExit: + umount = nil + again = true + case <-runner.ArvMountExit: + break + case <-time.After(time.Duration((delay + 1) * int64(time.Second))): + runner.CrunchLog.Printf("Timed out waiting for unmount") + if umount != nil { + umount.Process.Kill() + } + runner.ArvMount.Process.Kill() + } + } } + } - mnterr := <-runner.ArvMountExit - if mnterr != nil { - runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr) + if runner.ArvMountPoint != "" { + if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil { + runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr) } } for _, tmpdir := range runner.CleanupTempDir { - rmerr := os.RemoveAll(tmpdir) - if rmerr != nil { + if rmerr := os.RemoveAll(tmpdir); rmerr != nil { runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr) } } @@ -1060,14 +1312,19 @@ func (runner *ContainerRunner) CleanupDirs() { // CommitLogs posts the collection containing the final container logs. func (runner *ContainerRunner) CommitLogs() error { runner.CrunchLog.Print(runner.finalState) + + if runner.arvMountLog != nil { + runner.arvMountLog.Close() + } runner.CrunchLog.Close() - // Closing CrunchLog above allows it to be committed to Keep at this + // Closing CrunchLog above allows them to be committed to Keep at this // point, but re-open crunch log with ArvClient in case there are any - // other further (such as failing to write the log to Keep!) while - // shutting down - runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID, - "crunch-run", nil}) + // other further errors (such as failing to write the log to Keep!) + // while shutting down + runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil}) + runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) if runner.LogsPDH != nil { // If we have already assigned something to LogsPDH, @@ -1154,11 +1411,16 @@ func (runner *ContainerRunner) IsCancelled() bool { // NewArvLogWriter creates an ArvLogWriter func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { - return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")} + return &ArvLogWriter{ + ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, + loggingStream: name, + writeCloser: runner.LogCollection.Open(name + ".txt")} } // Run the full container lifecycle. func (runner *ContainerRunner) Run() (err error) { + runner.CrunchLog.Printf("crunch-run %q started", version) runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID) hostname, hosterr := os.Hostname() @@ -1168,12 +1430,16 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Executing on host '%s'", hostname) } - // Clean up temporary directories _after_ finalizing - // everything (if we've made any by then) - defer runner.CleanupDirs() - runner.finalState = "Queued" + defer func() { + runner.stopSignals() + runner.CleanupDirs() + + runner.CrunchLog.Printf("crunch-run finished") + runner.CrunchLog.Close() + }() + defer func() { // checkErr prints e (unless it's nil) and sets err to // e (unless err is already non-nil). Thus, if err @@ -1188,13 +1454,16 @@ func (runner *ContainerRunner) Run() (err error) { if err == nil { err = e } + if runner.finalState == "Complete" { + // There was an error in the finalization. + runner.finalState = "Cancelled" + } } // Log the error encountered in Run(), if any checkErr(err) if runner.finalState == "Queued" { - runner.CrunchLog.Close() runner.UpdateContainerFinal() return } @@ -1208,21 +1477,15 @@ func (runner *ContainerRunner) Run() (err error) { checkErr(runner.CaptureOutput()) checkErr(runner.CommitLogs()) checkErr(runner.UpdateContainerFinal()) - - // The real log is already closed, but then we opened - // a new one in case we needed to log anything while - // finalizing. - runner.CrunchLog.Close() }() - err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container) + err = runner.fetchContainerRecord() if err != nil { - err = fmt.Errorf("While getting container record: %v", err) return } // setup signal handling - runner.SetupSignals() + runner.setupSignals() // check for and/or load image err = runner.LoadImage() @@ -1280,6 +1543,24 @@ func (runner *ContainerRunner) Run() (err error) { return } +// Fetch the current container record (uuid = runner.Container.UUID) +// into runner.Container. +func (runner *ContainerRunner) fetchContainerRecord() error { + reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil) + if err != nil { + return fmt.Errorf("error fetching container record: %v", err) + } + defer reader.Close() + + dec := json.NewDecoder(reader) + dec.UseNumber() + err = dec.Decode(&runner.Container) + if err != nil { + return fmt.Errorf("error decoding container record: %v", err) + } + return nil +} + // NewContainerRunner creates a new container runner. func NewContainerRunner(api IArvadosClient, kc IKeepClient, @@ -1294,6 +1575,9 @@ func NewContainerRunner(api IArvadosClient, cr.Container.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) + + loadLogThrottleParams(api) + return cr } @@ -1311,8 +1595,18 @@ func main() { networkMode := flag.String("container-network-mode", "default", `Set networking mode for container. Corresponds to Docker network mode (--net). `) + memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container") + getVersion := flag.Bool("version", false, "Print version information and exit.") flag.Parse() + // Print version information if requested + if *getVersion { + fmt.Printf("Version: %s\n", version) + os.Exit(0) + } + + log.Printf("crunch-run %q started", version) + containerId := flag.Arg(0) if *caCertsPath != "" { @@ -1330,6 +1624,7 @@ func main() { if err != nil { log.Fatalf("%s: %v", containerId, err) } + kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} kc.Retries = 4 var docker *dockerclient.Client @@ -1354,9 +1649,24 @@ func main() { cr.expectCgroupParent = p } - err = cr.Run() - if err != nil { - log.Fatalf("%s: %v", containerId, err) + runerr := cr.Run() + + if *memprofile != "" { + f, err := os.Create(*memprofile) + if err != nil { + log.Printf("could not create memory profile: ", err) + } + runtime.GC() // get up-to-date statistics + if err := pprof.WriteHeapProfile(f); err != nil { + log.Printf("could not write memory profile: ", err) + } + closeerr := f.Close() + if closeerr != nil { + log.Printf("closing memprofile file: ", err) + } } + if runerr != nil { + log.Fatalf("%s: %v", containerId, runerr) + } }