Merge branch '17749-s3-prefixes'
[arvados.git] / lib / crunchrun / crunchrun.go
index 5638e81e4de6670673dc2163577768323919d551..42f143f1cb8fe6bb97bfff2511e9f318f37359af 100644 (file)
@@ -55,17 +55,18 @@ var ErrCancelled = errors.New("Cancelled")
 
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
-       PutB(buf []byte) (string, int, error)
+       BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
        ReadAt(locator string, p []byte, off int) (int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
        LocalLocator(locator string) (string, error)
        ClearBlockCache()
+       SetStorageClasses(sc []string)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
 type NewLogWriter func(name string) (io.WriteCloser, error)
 
-type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
+type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
 
 type MkTempDir func(string, string) (string, error)
 
@@ -76,7 +77,10 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-       executor containerExecutor
+       executor       containerExecutor
+       executorStdin  io.Closer
+       executorStdout io.Closer
+       executorStderr io.Closer
 
        // Dispatcher client is initialized with the Dispatcher token.
        // This is a privileged token used to manage container status
@@ -105,8 +109,6 @@ type ContainerRunner struct {
        ExitCode      *int
        NewLogWriter  NewLogWriter
        CrunchLog     *ThrottledLogger
-       Stdout        io.WriteCloser
-       Stderr        io.WriteCloser
        logUUID       string
        logMtx        sync.Mutex
        LogCollection arvados.CollectionFileSystem
@@ -258,23 +260,21 @@ func (runner *ContainerRunner) LoadImage() (string, error) {
                return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles)
        }
        imageID := tarfiles[0][:len(tarfiles[0])-4]
-       imageFile := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + tarfiles[0]
+       imageTarballPath := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + imageID + ".tar"
        runner.CrunchLog.Printf("Using Docker image id %q", imageID)
 
-       if !runner.executor.ImageLoaded(imageID) {
-               runner.CrunchLog.Print("Loading Docker image from keep")
-               err = runner.executor.LoadImage(imageFile)
-               if err != nil {
-                       return "", err
-               }
-       } else {
-               runner.CrunchLog.Print("Docker image is available")
+       runner.CrunchLog.Print("Loading Docker image from keep")
+       err = runner.executor.LoadImage(imageID, imageTarballPath, runner.Container, runner.ArvMountPoint,
+               runner.containerClient)
+       if err != nil {
+               return "", err
        }
+
        return imageID, nil
 }
 
-func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
-       c = exec.Command("arv-mount", arvMountCmd...)
+func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) {
+       c = exec.Command(cmdline[0], cmdline[1:]...)
 
        // Copy our environment, but override ARVADOS_API_TOKEN with
        // the container auth token.
@@ -291,8 +291,16 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
                return nil, err
        }
        runner.arvMountLog = NewThrottledLogger(w)
+       scanner := logScanner{
+               Patterns: []string{
+                       "Keep write error",
+                       "Block not found error",
+                       "Unhandled exception during FUSE operation",
+               },
+               ReportFunc: runner.reportArvMountWarning,
+       }
        c.Stdout = runner.arvMountLog
-       c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr)
+       c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
 
        runner.CrunchLog.Printf("Running %v", c.Args)
 
@@ -392,9 +400,11 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        pdhOnly := true
        tmpcount := 0
        arvMountCmd := []string{
+               "arv-mount",
                "--foreground",
                "--allow-other",
                "--read-write",
+               "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
                fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
 
        if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
@@ -596,6 +606,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        } else {
                arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
        }
+       arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
        arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
 
        runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
@@ -875,7 +886,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
 
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
-       var stdin io.ReadCloser
+       var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
        if mnt, ok := runner.Container.Mounts["stdin"]; ok {
                switch mnt.Kind {
                case "collection":
@@ -952,6 +963,9 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
        if !runner.enableMemoryLimit {
                ram = 0
        }
+       runner.executorStdin = stdin
+       runner.executorStdout = stdout
+       runner.executorStderr = stderr
        return runner.executor.Create(containerSpec{
                Image:         imageID,
                VCPUs:         runner.Container.RuntimeConstraints.VCPUs,
@@ -1016,6 +1030,27 @@ func (runner *ContainerRunner) WaitFinish() error {
        }
        runner.ExitCode = &exitcode
 
+       var returnErr error
+       if err = runner.executorStdin.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdin: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               returnErr = err
+       }
+       if err = runner.executorStdout.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdout: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+       if err = runner.executorStderr.Close(); err != nil {
+               err = fmt.Errorf("error closing container stderr: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+
        if runner.statReporter != nil {
                runner.statReporter.Stop()
                err = runner.statLogger.Close()
@@ -1023,7 +1058,7 @@ func (runner *ContainerRunner) WaitFinish() error {
                        runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
-       return nil
+       return returnErr
 }
 
 func (runner *ContainerRunner) updateLogs() {
@@ -1074,6 +1109,21 @@ func (runner *ContainerRunner) updateLogs() {
        }
 }
 
+func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
+       var updated arvados.Container
+       err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "container": arvadosclient.Dict{
+                       "runtime_status": arvadosclient.Dict{
+                               "warning":       "arv-mount: " + pattern,
+                               "warningDetail": text,
+                       },
+               },
+       }, &updated)
+       if err != nil {
+               runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
+       }
+}
+
 // CaptureOutput saves data from the container's output directory if
 // needed, and updates the container output accordingly.
 func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error {
@@ -1144,6 +1194,7 @@ func (runner *ContainerRunner) CleanupDirs() {
 
                if umnterr != nil {
                        runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
+                       runner.ArvMount.Process.Kill()
                } else {
                        // If arv-mount --unmount gets stuck for any reason, we
                        // don't want to wait for it forever.  Do Wait() in a goroutine
@@ -1174,12 +1225,14 @@ func (runner *ContainerRunner) CleanupDirs() {
                                }
                        }
                }
+               runner.ArvMount = nil
        }
 
        if runner.ArvMountPoint != "" {
                if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
                        runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
                }
+               runner.ArvMountPoint = ""
        }
 
        if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
@@ -1348,7 +1401,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err
 // Run the full container lifecycle.
 func (runner *ContainerRunner) Run() (err error) {
        runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
-       runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
+       runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime())
 
        hostname, hosterr := os.Hostname()
        if hosterr != nil {
@@ -1414,6 +1467,7 @@ func (runner *ContainerRunner) Run() (err error) {
                }
                checkErr("stopHoststat", runner.stopHoststat())
                checkErr("CommitLogs", runner.CommitLogs())
+               runner.CleanupDirs()
                checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
        }()
 
@@ -1519,6 +1573,9 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
                return fmt.Errorf("error creating container API client: %v", err)
        }
 
+       runner.ContainerKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+       runner.DispatcherKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+
        err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
        if err != nil {
                if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {