17813: Refactor singularity image loading / caching / conversion
[arvados.git] / lib / crunchrun / crunchrun.go
index 5638e81e4de6670673dc2163577768323919d551..d4a8650726d2aae9d6b222cf0a6baa89a2091dad 100644 (file)
@@ -55,11 +55,12 @@ var ErrCancelled = errors.New("Cancelled")
 
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
-       PutB(buf []byte) (string, int, error)
+       BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
        ReadAt(locator string, p []byte, off int) (int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
        LocalLocator(locator string) (string, error)
        ClearBlockCache()
+       SetStorageClasses(sc []string)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -76,7 +77,10 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-       executor containerExecutor
+       executor       containerExecutor
+       executorStdin  io.Closer
+       executorStdout io.Closer
+       executorStderr io.Closer
 
        // Dispatcher client is initialized with the Dispatcher token.
        // This is a privileged token used to manage container status
@@ -105,8 +109,6 @@ type ContainerRunner struct {
        ExitCode      *int
        NewLogWriter  NewLogWriter
        CrunchLog     *ThrottledLogger
-       Stdout        io.WriteCloser
-       Stderr        io.WriteCloser
        logUUID       string
        logMtx        sync.Mutex
        LogCollection arvados.CollectionFileSystem
@@ -258,18 +260,15 @@ func (runner *ContainerRunner) LoadImage() (string, error) {
                return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles)
        }
        imageID := tarfiles[0][:len(tarfiles[0])-4]
-       imageFile := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + tarfiles[0]
        runner.CrunchLog.Printf("Using Docker image id %q", imageID)
 
-       if !runner.executor.ImageLoaded(imageID) {
-               runner.CrunchLog.Print("Loading Docker image from keep")
-               err = runner.executor.LoadImage(imageFile)
-               if err != nil {
-                       return "", err
-               }
-       } else {
-               runner.CrunchLog.Print("Docker image is available")
+       runner.CrunchLog.Print("Loading Docker image from keep")
+       err = runner.executor.LoadImage(imageID, runner.Container, runner.ArvMountPoint,
+               runner.containerClient, runner.ContainerKeepClient)
+       if err != nil {
+               return "", err
        }
+
        return imageID, nil
 }
 
@@ -395,6 +394,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                "--foreground",
                "--allow-other",
                "--read-write",
+               "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
                fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
 
        if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
@@ -596,6 +596,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        } else {
                arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
        }
+       arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
        arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
 
        runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
@@ -875,7 +876,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
 
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
-       var stdin io.ReadCloser
+       var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
        if mnt, ok := runner.Container.Mounts["stdin"]; ok {
                switch mnt.Kind {
                case "collection":
@@ -952,6 +953,9 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
        if !runner.enableMemoryLimit {
                ram = 0
        }
+       runner.executorStdin = stdin
+       runner.executorStdout = stdout
+       runner.executorStderr = stderr
        return runner.executor.Create(containerSpec{
                Image:         imageID,
                VCPUs:         runner.Container.RuntimeConstraints.VCPUs,
@@ -1016,6 +1020,27 @@ func (runner *ContainerRunner) WaitFinish() error {
        }
        runner.ExitCode = &exitcode
 
+       var returnErr error
+       if err = runner.executorStdin.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdin: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               returnErr = err
+       }
+       if err = runner.executorStdout.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdout: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+       if err = runner.executorStderr.Close(); err != nil {
+               err = fmt.Errorf("error closing container stderr: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+
        if runner.statReporter != nil {
                runner.statReporter.Stop()
                err = runner.statLogger.Close()
@@ -1023,7 +1048,7 @@ func (runner *ContainerRunner) WaitFinish() error {
                        runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
-       return nil
+       return returnErr
 }
 
 func (runner *ContainerRunner) updateLogs() {
@@ -1174,12 +1199,14 @@ func (runner *ContainerRunner) CleanupDirs() {
                                }
                        }
                }
+               runner.ArvMount = nil
        }
 
        if runner.ArvMountPoint != "" {
                if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
                        runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
                }
+               runner.ArvMountPoint = ""
        }
 
        if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
@@ -1414,6 +1441,7 @@ func (runner *ContainerRunner) Run() (err error) {
                }
                checkErr("stopHoststat", runner.stopHoststat())
                checkErr("CommitLogs", runner.CommitLogs())
+               runner.CleanupDirs()
                checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
        }()
 
@@ -1519,6 +1547,9 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
                return fmt.Errorf("error creating container API client: %v", err)
        }
 
+       runner.ContainerKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+       runner.DispatcherKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+
        err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
        if err != nil {
                if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {