Merge branch '8784-dir-listings'
[arvados.git] / services / crunch-run / crunchrun.go
index 80ca27f68d46a8a5e56266217ab14cdb7417bb96..810955c0da8b2119dd1cec42fda365a782a37896 100644 (file)
@@ -1,6 +1,11 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
+       "bytes"
        "context"
        "encoding/json"
        "errors"
@@ -48,7 +53,7 @@ var ErrCancelled = errors.New("Cancelled")
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
-       ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
+       ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -60,15 +65,62 @@ type MkTempDir func(string, string) (string, error)
 
 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
 type ThinDockerClient interface {
-       ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
-       ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
-       ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
+       ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
        ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
                networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
        ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
-       ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
        ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
-       ContainerWait(ctx context.Context, container string) (int64, error)
+       ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
+       ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
+       ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
+       ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
+}
+
+// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
+// that executes the docker requests on dockerclient.Client
+type ThinDockerClientProxy struct {
+       Docker *dockerclient.Client
+}
+
+// ContainerAttach invokes dockerclient.Client.ContainerAttach
+func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
+       return proxy.Docker.ContainerAttach(ctx, container, options)
+}
+
+// ContainerCreate invokes dockerclient.Client.ContainerCreate
+func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+       networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
+       return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
+}
+
+// ContainerStart invokes dockerclient.Client.ContainerStart
+func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+       return proxy.Docker.ContainerStart(ctx, container, options)
+}
+
+// ContainerStop invokes dockerclient.Client.ContainerStop
+func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+       return proxy.Docker.ContainerStop(ctx, container, timeout)
+}
+
+// ContainerWait invokes dockerclient.Client.ContainerWait
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
+       return proxy.Docker.ContainerWait(ctx, container, condition)
+}
+
+// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
+func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+       return proxy.Docker.ImageInspectWithRaw(ctx, image)
+}
+
+// ImageLoad invokes dockerclient.Client.ImageLoad
+func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+       return proxy.Docker.ImageLoad(ctx, input, quiet)
+}
+
+// ImageRemove invokes dockerclient.Client.ImageRemove
+func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
+       return proxy.Docker.ImageRemove(ctx, image, options)
 }
 
 // ContainerRunner is the main stateful struct used for a single execution of a
@@ -87,7 +139,7 @@ type ContainerRunner struct {
        loggingDone   chan bool
        CrunchLog     *ThrottledLogger
        Stdout        io.WriteCloser
-       Stderr        *ThrottledLogger
+       Stderr        io.WriteCloser
        LogCollection *CollectionWriter
        LogsPDH       *string
        RunArvMount
@@ -97,6 +149,7 @@ type ContainerRunner struct {
        HostOutputDir  string
        CleanupTempDir []string
        Binds          []string
+       Volumes        map[string]struct{}
        OutputPDH      *string
        SigChan        chan os.Signal
        ArvMountExit   chan error
@@ -195,10 +248,10 @@ func (runner *ContainerRunner) LoadImage() (err error) {
                }
 
                response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, false)
-               response.Body.Close()
                if err != nil {
                        return fmt.Errorf("While loading container image into Docker: %v", err)
                }
+               response.Body.Close()
        } else {
                runner.CrunchLog.Print("Docker image is available")
        }
@@ -288,6 +341,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        collectionPaths := []string{}
        runner.Binds = nil
+       runner.Volumes = make(map[string]struct{})
        needCertMount := true
 
        var binds []string
@@ -298,10 +352,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if bind == "stdout" {
+               if bind == "stdout" || bind == "stderr" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
-                               return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+                               return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
                        }
 
                        // Does path start with OutputPath?
@@ -310,7 +364,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                prefix += "/"
                        }
                        if !strings.HasPrefix(mnt.Path, prefix) {
-                               return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+                               return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+                       }
+               }
+
+               if bind == "stdin" {
+                       // Is it a "collection" mount kind?
+                       if mnt.Kind != "collection" && mnt.Kind != "json" {
+                               return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
                        }
                }
 
@@ -325,7 +386,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
 
                switch {
-               case mnt.Kind == "collection":
+               case mnt.Kind == "collection" && bind != "stdin":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
                                return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
@@ -373,24 +434,25 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                        collectionPaths = append(collectionPaths, src)
 
-               case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
-                       runner.HostOutputDir, err = runner.MkTempDir("", "")
+               case mnt.Kind == "tmp":
+                       var tmpdir string
+                       tmpdir, err = runner.MkTempDir("", "")
                        if err != nil {
                                return fmt.Errorf("While creating mount temp dir: %v", err)
                        }
-                       st, staterr := os.Stat(runner.HostOutputDir)
+                       st, staterr := os.Stat(tmpdir)
                        if staterr != nil {
                                return fmt.Errorf("While Stat on temp dir: %v", staterr)
                        }
-                       err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
+                       err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
                        if staterr != nil {
                                return fmt.Errorf("While Chmod temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
-                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
-
-               case mnt.Kind == "tmp":
-                       runner.Binds = append(runner.Binds, bind)
+                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
+                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
+                       if bind == runner.Container.OutputPath {
+                               runner.HostOutputDir = tmpdir
+                       }
 
                case mnt.Kind == "json":
                        jsondata, err := json.Marshal(mnt.Content)
@@ -576,11 +638,12 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
 // Get and save the raw JSON container record from the API server
 func (runner *ContainerRunner) LogContainerRecord() (err error) {
        w := &ArvLogWriter{
-               runner.ArvClient,
-               runner.Container.UUID,
-               "container",
-               runner.LogCollection.Open("container.json"),
+               ArvClient:     runner.ArvClient,
+               UUID:          runner.Container.UUID,
+               loggingStream: "container",
+               writeCloser:   runner.LogCollection.Open("container.json"),
        }
+
        // Get Container record JSON from the API Server
        reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
        if err != nil {
@@ -610,14 +673,44 @@ func (runner *ContainerRunner) LogContainerRecord() (err error) {
        return nil
 }
 
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
 func (runner *ContainerRunner) AttachStreams() (err error) {
 
        runner.CrunchLog.Print("Attaching container streams")
 
+       // If stdin mount is provided, attach it to the docker container
+       var stdinRdr arvados.File
+       var stdinJson []byte
+       if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+               if stdinMnt.Kind == "collection" {
+                       var stdinColl arvados.Collection
+                       collId := stdinMnt.UUID
+                       if collId == "" {
+                               collId = stdinMnt.PortableDataHash
+                       }
+                       err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+                       if err != nil {
+                               return fmt.Errorf("While getting stding collection: %v", err)
+                       }
+
+                       stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+                       if os.IsNotExist(err) {
+                               return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+                       } else if err != nil {
+                               return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+                       }
+               } else if stdinMnt.Kind == "json" {
+                       stdinJson, err = json.Marshal(stdinMnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("While encoding stdin json data: %v", err)
+                       }
+               }
+       }
+
+       stdinUsed := stdinRdr != nil || len(stdinJson) != 0
        response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
+               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
                return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
        }
@@ -625,37 +718,76 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        runner.loggingDone = make(chan bool)
 
        if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
-               stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
-               index := strings.LastIndex(stdoutPath, "/")
-               if index > 0 {
-                       subdirs := stdoutPath[:index]
-                       if subdirs != "" {
-                               st, err := os.Stat(runner.HostOutputDir)
-                               if err != nil {
-                                       return fmt.Errorf("While Stat on temp dir: %v", err)
-                               }
-                               stdoutPath := path.Join(runner.HostOutputDir, subdirs)
-                               err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
-                               if err != nil {
-                                       return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
-                               }
-                       }
-               }
-               stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+               stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
                if err != nil {
-                       return fmt.Errorf("While creating stdout file: %v", err)
+                       return err
                }
                runner.Stdout = stdoutFile
        } else {
                runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
        }
-       runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+
+       if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+               stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+               if err != nil {
+                       return err
+               }
+               runner.Stderr = stderrFile
+       } else {
+               runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+       }
+
+       if stdinRdr != nil {
+               go func() {
+                       _, err := io.Copy(response.Conn, stdinRdr)
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+                               runner.stop()
+                       }
+                       stdinRdr.Close()
+                       response.CloseWrite()
+               }()
+       } else if len(stdinJson) != 0 {
+               go func() {
+                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+                               runner.stop()
+                       }
+                       response.CloseWrite()
+               }()
+       }
 
        go runner.ProcessDockerAttach(response.Reader)
 
        return nil
 }
 
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+       stdoutPath := mntPath[len(runner.Container.OutputPath):]
+       index := strings.LastIndex(stdoutPath, "/")
+       if index > 0 {
+               subdirs := stdoutPath[:index]
+               if subdirs != "" {
+                       st, err := os.Stat(runner.HostOutputDir)
+                       if err != nil {
+                               return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+                       }
+                       stdoutPath := filepath.Join(runner.HostOutputDir, subdirs)
+                       err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+                       if err != nil {
+                               return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+                       }
+               }
+       }
+       stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath))
+       if err != nil {
+               return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+       }
+
+       return stdoutFile, nil
+}
+
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer() error {
        runner.CrunchLog.Print("Creating Docker container")
@@ -669,13 +801,16 @@ func (runner *ContainerRunner) CreateContainer() error {
                runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
        }
 
-       runner.ContainerID = createdBody.ID
-       runner.HostConfig = dockerclient.HostConfig{
-               Binds:  runner.Binds,
-               Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+       runner.ContainerConfig.Volumes = runner.Volumes
+
+       runner.HostConfig = dockercontainer.HostConfig{
+               Binds: runner.Binds,
                LogConfig: dockercontainer.LogConfig{
                        Type: "none",
                },
+               Resources: dockercontainer.Resources{
+                       CgroupParent: runner.setCgroupParent,
+               },
        }
 
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
@@ -688,20 +823,29 @@ func (runner *ContainerRunner) CreateContainer() error {
                        "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
                        "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
                )
-               runner.HostConfig.NetworkMode = runner.networkMode
+               runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
        } else {
                if runner.enableNetwork == "always" {
-                       runner.HostConfig.NetworkMode = runner.networkMode
+                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
                } else {
-                       runner.HostConfig.NetworkMode = "none"
+                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
                }
        }
 
-       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, nil, nil, "")
+       _, stdinUsed := runner.Container.Mounts["stdin"]
+       runner.ContainerConfig.OpenStdin = stdinUsed
+       runner.ContainerConfig.StdinOnce = stdinUsed
+       runner.ContainerConfig.AttachStdin = stdinUsed
+       runner.ContainerConfig.AttachStdout = true
+       runner.ContainerConfig.AttachStderr = true
+
+       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)
        }
 
+       runner.ContainerID = createdBody.ID
+
        return runner.AttachStreams()
 }
 
@@ -724,23 +868,28 @@ func (runner *ContainerRunner) StartContainer() error {
 
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() error {
+func (runner *ContainerRunner) WaitFinish() (err error) {
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
+       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+
+       var waitBody dockercontainer.ContainerWaitOKBody
+       select {
+       case waitBody = <-waitOk:
+       case err = <-waitErr:
+       }
+
        if err != nil {
                return fmt.Errorf("container wait: %v", err)
        }
 
-       if waitDocker != 0 { // what is the acceptable waitDocker code?
-               runner.CrunchLog.Printf("container wait API status code: %v", waitDocker)
-               code := int(waitDocker)
-               runner.ExitCode = &code
-       }
+       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+       code := int(waitBody.StatusCode)
+       runner.ExitCode = &code
 
        waitMount := runner.ArvMountExit
        select {
-       case err := <-waitMount:
+       case err = <-waitMount:
                runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
                waitMount = nil
                runner.stop()
@@ -783,14 +932,91 @@ func (runner *ContainerRunner) CaptureOutput() error {
                return fmt.Errorf("While checking host output path: %v", err)
        }
 
+       // Pre-populate output from the configured mount points
+       var binds []string
+       for bind, mnt := range runner.Container.Mounts {
+               if mnt.Kind == "collection" {
+                       binds = append(binds, bind)
+               }
+       }
+       sort.Strings(binds)
+
        var manifestText string
 
        collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
        _, err = os.Stat(collectionMetafile)
        if err != nil {
                // Regular directory
+
+               // Find symlinks to arv-mounted files & dirs.
+               err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
+                       if err != nil {
+                               return err
+                       }
+                       if info.Mode()&os.ModeSymlink == 0 {
+                               return nil
+                       }
+                       // read link to get container internal path
+                       // only support 1 level of symlinking here.
+                       var tgt string
+                       tgt, err = os.Readlink(path)
+                       if err != nil {
+                               return err
+                       }
+
+                       // get path relative to output dir
+                       outputSuffix := path[len(runner.HostOutputDir):]
+
+                       if strings.HasPrefix(tgt, "/") {
+                               // go through mounts and try reverse map to collection reference
+                               for _, bind := range binds {
+                                       mnt := runner.Container.Mounts[bind]
+                                       if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+                                               // get path relative to bind
+                                               targetSuffix := tgt[len(bind):]
+
+                                               // Copy mount and adjust the path to add path relative to the bind
+                                               adjustedMount := mnt
+                                               adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
+
+                                               // get manifest text
+                                               var m string
+                                               m, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
+                                               if err != nil {
+                                                       return err
+                                               }
+                                               manifestText = manifestText + m
+                                               // delete symlink so WriteTree won't try to to dereference it.
+                                               os.Remove(path)
+                                               return nil
+                                       }
+                               }
+                       }
+
+                       // Not a link to a mount.  Must be dereferencible and
+                       // point into the output directory.
+                       tgt, err = filepath.EvalSymlinks(path)
+                       if err != nil {
+                               os.Remove(path)
+                               return err
+                       }
+
+                       // Symlink target must be within the output directory otherwise it's an error.
+                       if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
+                               os.Remove(path)
+                               return fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.",
+                                       outputSuffix, tgt)
+                       }
+                       return nil
+               })
+               if err != nil {
+                       return fmt.Errorf("While checking output symlinks: %v", err)
+               }
+
                cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
-               manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+               var m string
+               m, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+               manifestText = manifestText + m
                if err != nil {
                        return fmt.Errorf("While uploading output files: %v", err)
                }
@@ -810,13 +1036,6 @@ func (runner *ContainerRunner) CaptureOutput() error {
                manifestText = rec.ManifestText
        }
 
-       // Pre-populate output from the configured mount points
-       var binds []string
-       for bind, _ := range runner.Container.Mounts {
-               binds = append(binds, bind)
-       }
-       sort.Strings(binds)
-
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
 
@@ -931,8 +1150,8 @@ func (runner *ContainerRunner) CommitLogs() error {
        // point, but re-open crunch log with ArvClient in case there are any
        // other further (such as failing to write the log to Keep!) while
        // shutting down
-       runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
-               "crunch-run", nil})
+       runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+               UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
 
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
@@ -1019,7 +1238,8 @@ func (runner *ContainerRunner) IsCancelled() bool {
 
 // NewArvLogWriter creates an ArvLogWriter
 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
-       return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
+       return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
+               writeCloser: runner.LogCollection.Open(name + ".txt")}
 }
 
 // Run the full container lifecycle.
@@ -1053,6 +1273,10 @@ func (runner *ContainerRunner) Run() (err error) {
                        if err == nil {
                                err = e
                        }
+                       if runner.finalState == "Complete" {
+                               // There was an error in the finalization.
+                               runner.finalState = "Cancelled"
+                       }
                }
 
                // Log the error encountered in Run(), if any
@@ -1159,6 +1383,9 @@ func NewContainerRunner(api IArvadosClient,
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+
+       loadLogThrottleParams(api)
+
        return cr
 }
 
@@ -1205,7 +1432,9 @@ func main() {
                log.Fatalf("%s: %v", containerId, err)
        }
 
-       cr := NewContainerRunner(api, kc, docker, containerId)
+       dockerClientProxy := ThinDockerClientProxy{Docker: docker}
+
+       cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent