X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/555c67a6126b88b592110b82bd96fed5cff5da31..5e575029573df492d5d091ac504f2f2e7b9969ef:/lib/crunchrun/crunchrun.go?ds=sidebyside diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 3c9c381619..42f143f1cb 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -55,7 +55,7 @@ var ErrCancelled = errors.New("Cancelled") // IKeepClient is the minimal Keep API methods used by crunch-run. type IKeepClient interface { - PutB(buf []byte) (string, int, error) + BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) ReadAt(locator string, p []byte, off int) (int, error) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) LocalLocator(locator string) (string, error) @@ -66,7 +66,7 @@ type IKeepClient interface { // NewLogWriter is a factory function to create a new log writer. type NewLogWriter func(name string) (io.WriteCloser, error) -type RunArvMount func(args []string, tok string) (*exec.Cmd, error) +type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error) type MkTempDir func(string, string) (string, error) @@ -77,7 +77,10 @@ type PsProcess interface { // ContainerRunner is the main stateful struct used for a single execution of a // container. type ContainerRunner struct { - executor containerExecutor + executor containerExecutor + executorStdin io.Closer + executorStdout io.Closer + executorStderr io.Closer // Dispatcher client is initialized with the Dispatcher token. // This is a privileged token used to manage container status @@ -106,8 +109,6 @@ type ContainerRunner struct { ExitCode *int NewLogWriter NewLogWriter CrunchLog *ThrottledLogger - Stdout io.WriteCloser - Stderr io.WriteCloser logUUID string logMtx sync.Mutex LogCollection arvados.CollectionFileSystem @@ -259,23 +260,21 @@ func (runner *ContainerRunner) LoadImage() (string, error) { return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles) } imageID := tarfiles[0][:len(tarfiles[0])-4] - imageFile := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + tarfiles[0] + imageTarballPath := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + imageID + ".tar" runner.CrunchLog.Printf("Using Docker image id %q", imageID) - if !runner.executor.ImageLoaded(imageID) { - runner.CrunchLog.Print("Loading Docker image from keep") - err = runner.executor.LoadImage(imageFile) - if err != nil { - return "", err - } - } else { - runner.CrunchLog.Print("Docker image is available") + runner.CrunchLog.Print("Loading Docker image from keep") + err = runner.executor.LoadImage(imageID, imageTarballPath, runner.Container, runner.ArvMountPoint, + runner.containerClient) + if err != nil { + return "", err } + return imageID, nil } -func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) { - c = exec.Command("arv-mount", arvMountCmd...) +func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) { + c = exec.Command(cmdline[0], cmdline[1:]...) // Copy our environment, but override ARVADOS_API_TOKEN with // the container auth token. @@ -292,8 +291,16 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( return nil, err } runner.arvMountLog = NewThrottledLogger(w) + scanner := logScanner{ + Patterns: []string{ + "Keep write error", + "Block not found error", + "Unhandled exception during FUSE operation", + }, + ReportFunc: runner.reportArvMountWarning, + } c.Stdout = runner.arvMountLog - c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr) + c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner) runner.CrunchLog.Printf("Running %v", c.Args) @@ -393,6 +400,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { pdhOnly := true tmpcount := 0 arvMountCmd := []string{ + "arv-mount", "--foreground", "--allow-other", "--read-write", @@ -598,6 +606,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { } else { arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id") } + arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid") arvMountCmd = append(arvMountCmd, runner.ArvMountPoint) runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token) @@ -877,7 +886,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { // CreateContainer creates the docker container. func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error { - var stdin io.ReadCloser + var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) if mnt, ok := runner.Container.Mounts["stdin"]; ok { switch mnt.Kind { case "collection": @@ -954,6 +963,9 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st if !runner.enableMemoryLimit { ram = 0 } + runner.executorStdin = stdin + runner.executorStdout = stdout + runner.executorStderr = stderr return runner.executor.Create(containerSpec{ Image: imageID, VCPUs: runner.Container.RuntimeConstraints.VCPUs, @@ -1018,6 +1030,27 @@ func (runner *ContainerRunner) WaitFinish() error { } runner.ExitCode = &exitcode + var returnErr error + if err = runner.executorStdin.Close(); err != nil { + err = fmt.Errorf("error closing container stdin: %s", err) + runner.CrunchLog.Printf("%s", err) + returnErr = err + } + if err = runner.executorStdout.Close(); err != nil { + err = fmt.Errorf("error closing container stdout: %s", err) + runner.CrunchLog.Printf("%s", err) + if returnErr == nil { + returnErr = err + } + } + if err = runner.executorStderr.Close(); err != nil { + err = fmt.Errorf("error closing container stderr: %s", err) + runner.CrunchLog.Printf("%s", err) + if returnErr == nil { + returnErr = err + } + } + if runner.statReporter != nil { runner.statReporter.Stop() err = runner.statLogger.Close() @@ -1025,7 +1058,7 @@ func (runner *ContainerRunner) WaitFinish() error { runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) } } - return nil + return returnErr } func (runner *ContainerRunner) updateLogs() { @@ -1076,6 +1109,21 @@ func (runner *ContainerRunner) updateLogs() { } } +func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) { + var updated arvados.Container + err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "container": arvadosclient.Dict{ + "runtime_status": arvadosclient.Dict{ + "warning": "arv-mount: " + pattern, + "warningDetail": text, + }, + }, + }, &updated) + if err != nil { + runner.CrunchLog.Printf("error updating container runtime_status: %s", err) + } +} + // CaptureOutput saves data from the container's output directory if // needed, and updates the container output accordingly. func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error { @@ -1146,6 +1194,7 @@ func (runner *ContainerRunner) CleanupDirs() { if umnterr != nil { runner.CrunchLog.Printf("Error unmounting: %v", umnterr) + runner.ArvMount.Process.Kill() } else { // If arv-mount --unmount gets stuck for any reason, we // don't want to wait for it forever. Do Wait() in a goroutine @@ -1176,12 +1225,14 @@ func (runner *ContainerRunner) CleanupDirs() { } } } + runner.ArvMount = nil } if runner.ArvMountPoint != "" { if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil { runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr) } + runner.ArvMountPoint = "" } if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil { @@ -1350,7 +1401,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err // Run the full container lifecycle. func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String()) - runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID) + runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime()) hostname, hosterr := os.Hostname() if hosterr != nil { @@ -1416,6 +1467,7 @@ func (runner *ContainerRunner) Run() (err error) { } checkErr("stopHoststat", runner.stopHoststat()) checkErr("CommitLogs", runner.CommitLogs()) + runner.CleanupDirs() checkErr("UpdateContainerFinal", runner.UpdateContainerFinal()) }()