12614: Restore order set state "Running" before starting the container.
[arvados.git] / services / crunch-run / crunchrun.go
index 3b31da567fd4ad4b0f3f84dae1af756336ff8260..180da0c6796eda8b354123c2d95ef82472010686 100644 (file)
@@ -1,16 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
+       "bytes"
+       "context"
        "encoding/json"
        "errors"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/lib/crunchstat"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "github.com/curoverse/dockerclient"
        "io"
        "io/ioutil"
        "log"
@@ -19,11 +19,24 @@ import (
        "os/signal"
        "path"
        "path/filepath"
+       "runtime"
+       "runtime/pprof"
        "sort"
        "strings"
        "sync"
        "syscall"
        "time"
+
+       "git.curoverse.com/arvados.git/lib/crunchstat"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
+
+       dockertypes "github.com/docker/docker/api/types"
+       dockercontainer "github.com/docker/docker/api/types/container"
+       dockernetwork "github.com/docker/docker/api/types/network"
+       dockerclient "github.com/docker/docker/client"
 )
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -32,6 +45,7 @@ type IArvadosClient interface {
        Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
        Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
        Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
+       CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error)
        Discovery(key string) (interface{}, error)
 }
 
@@ -41,7 +55,8 @@ var ErrCancelled = errors.New("Cancelled")
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
-       ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+       ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
+       ClearBlockCache()
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -53,14 +68,62 @@ type MkTempDir func(string, string) (string, error)
 
 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
 type ThinDockerClient interface {
-       StopContainer(id string, timeout int) error
-       InspectImage(id string) (*dockerclient.ImageInfo, error)
-       LoadImage(reader io.Reader) error
-       CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
-       StartContainer(id string, config *dockerclient.HostConfig) error
-       AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
-       Wait(id string) <-chan dockerclient.WaitResult
-       RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
+       ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
+       ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+               networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
+       ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
+       ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+       ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
+       ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
+       ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
+       ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
+}
+
+// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
+// that executes the docker requests on dockerclient.Client
+type ThinDockerClientProxy struct {
+       Docker *dockerclient.Client
+}
+
+// ContainerAttach invokes dockerclient.Client.ContainerAttach
+func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
+       return proxy.Docker.ContainerAttach(ctx, container, options)
+}
+
+// ContainerCreate invokes dockerclient.Client.ContainerCreate
+func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+       networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
+       return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
+}
+
+// ContainerStart invokes dockerclient.Client.ContainerStart
+func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+       return proxy.Docker.ContainerStart(ctx, container, options)
+}
+
+// ContainerStop invokes dockerclient.Client.ContainerStop
+func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+       return proxy.Docker.ContainerStop(ctx, container, timeout)
+}
+
+// ContainerWait invokes dockerclient.Client.ContainerWait
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
+       return proxy.Docker.ContainerWait(ctx, container, condition)
+}
+
+// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
+func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+       return proxy.Docker.ImageInspectWithRaw(ctx, image)
+}
+
+// ImageLoad invokes dockerclient.Client.ImageLoad
+func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+       return proxy.Docker.ImageLoad(ctx, input, quiet)
+}
+
+// ImageRemove invokes dockerclient.Client.ImageRemove
+func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
+       return proxy.Docker.ImageRemove(ctx, image, options)
 }
 
 // ContainerRunner is the main stateful struct used for a single execution of a
@@ -70,8 +133,8 @@ type ContainerRunner struct {
        ArvClient IArvadosClient
        Kc        IKeepClient
        arvados.Container
-       dockerclient.ContainerConfig
-       dockerclient.HostConfig
+       ContainerConfig dockercontainer.Config
+       dockercontainer.HostConfig
        token       string
        ContainerID string
        ExitCode    *int
@@ -79,7 +142,7 @@ type ContainerRunner struct {
        loggingDone   chan bool
        CrunchLog     *ThrottledLogger
        Stdout        io.WriteCloser
-       Stderr        *ThrottledLogger
+       Stderr        io.WriteCloser
        LogCollection *CollectionWriter
        LogsPDH       *string
        RunArvMount
@@ -89,13 +152,11 @@ type ContainerRunner struct {
        HostOutputDir  string
        CleanupTempDir []string
        Binds          []string
+       Volumes        map[string]struct{}
        OutputPDH      *string
-       CancelLock     sync.Mutex
-       Cancelled      bool
        SigChan        chan os.Signal
        ArvMountExit   chan error
        finalState     string
-       trashLifetime  time.Duration
 
        statLogger   io.WriteCloser
        statReporter *crunchstat.Reporter
@@ -114,28 +175,83 @@ type ContainerRunner struct {
        // parent to be X" feature even on sites where the "specify
        // cgroup parent" feature breaks.
        setCgroupParent string
+
+       cStateLock sync.Mutex
+       cStarted   bool // StartContainer() succeeded
+       cCancelled bool // StopContainer() invoked
+
+       enableNetwork string // one of "default" or "always"
+       networkMode   string // passed through to HostConfig.NetworkMode
+       arvMountLog   *ThrottledLogger
 }
 
-// SetupSignals sets up signal handling to gracefully terminate the underlying
+// setupSignals sets up signal handling to gracefully terminate the underlying
 // Docker container and update state when receiving a TERM, INT or QUIT signal.
-func (runner *ContainerRunner) SetupSignals() {
+func (runner *ContainerRunner) setupSignals() {
        runner.SigChan = make(chan os.Signal, 1)
        signal.Notify(runner.SigChan, syscall.SIGTERM)
        signal.Notify(runner.SigChan, syscall.SIGINT)
        signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
-       go func(sig <-chan os.Signal) {
-               for range sig {
-                       if !runner.Cancelled {
-                               runner.CancelLock.Lock()
-                               runner.Cancelled = true
-                               if runner.ContainerID != "" {
-                                       runner.Docker.StopContainer(runner.ContainerID, 10)
+       go func(sig chan os.Signal) {
+               s := <-sig
+               if s != nil {
+                       runner.CrunchLog.Printf("Caught signal %v", s)
+               }
+               runner.stop()
+       }(runner.SigChan)
+}
+
+// stop the underlying Docker container.
+func (runner *ContainerRunner) stop() {
+       runner.cStateLock.Lock()
+       defer runner.cStateLock.Unlock()
+       if runner.cCancelled {
+               return
+       }
+       runner.cCancelled = true
+       if runner.cStarted {
+               timeout := time.Duration(10)
+               err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
+               if err != nil {
+                       runner.CrunchLog.Printf("StopContainer failed: %s", err)
+               }
+               // Suppress multiple calls to stop()
+               runner.cStarted = false
+       }
+}
+
+func (runner *ContainerRunner) stopSignals() {
+       if runner.SigChan != nil {
+               signal.Stop(runner.SigChan)
+               close(runner.SigChan)
+       }
+}
+
+var errorBlacklist = []string{"Cannot connect to the Docker daemon"}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+
+func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+       for _, d := range errorBlacklist {
+               if strings.Index(goterr.Error(), d) != -1 {
+                       runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+                       if *brokenNodeHook == "" {
+                               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+                       } else {
+                               runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+                               // run killme script
+                               c := exec.Command(*brokenNodeHook)
+                               c.Stdout = runner.CrunchLog
+                               c.Stderr = runner.CrunchLog
+                               err := c.Run()
+                               if err != nil {
+                                       runner.CrunchLog.Printf("Error running broken node hook: %v", err)
                                }
-                               runner.CancelLock.Unlock()
                        }
+                       return true
                }
-       }(runner.SigChan)
+       }
+       return false
 }
 
 // LoadImage determines the docker image id from the container record and
@@ -162,7 +278,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
        runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
 
-       _, err = runner.Docker.InspectImage(imageID)
+       _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
        if err != nil {
                runner.CrunchLog.Print("Loading Docker image from keep")
 
@@ -172,16 +288,25 @@ func (runner *ContainerRunner) LoadImage() (err error) {
                        return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
                }
 
-               err = runner.Docker.LoadImage(readCloser)
+               response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
                if err != nil {
                        return fmt.Errorf("While loading container image into Docker: %v", err)
                }
+
+               defer response.Body.Close()
+               rbody, err := ioutil.ReadAll(response.Body)
+               if err != nil {
+                       return fmt.Errorf("Reading response to image load: %v", err)
+               }
+               runner.CrunchLog.Printf("Docker response: %s", rbody)
        } else {
                runner.CrunchLog.Print("Docker image is available")
        }
 
        runner.ContainerConfig.Image = imageID
 
+       runner.Kc.ClearBlockCache()
+
        return nil
 }
 
@@ -198,9 +323,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
        }
        c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
 
-       nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
-       c.Stdout = nt
-       c.Stderr = nt
+       runner.arvMountLog = NewThrottledLogger(runner.NewLogWriter("arv-mount"))
+       c.Stdout = runner.arvMountLog
+       c.Stderr = runner.arvMountLog
+
+       runner.CrunchLog.Printf("Running %v", c.Args)
 
        err = c.Start()
        if err != nil {
@@ -224,7 +351,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
        }()
 
        go func() {
-               runner.ArvMountExit <- c.Wait()
+               mnterr := c.Wait()
+               if mnterr != nil {
+                       runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+               }
+               runner.ArvMountExit <- mnterr
                close(runner.ArvMountExit)
        }()
 
@@ -240,8 +371,6 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
        return c, nil
 }
 
-var tmpBackedOutputDir = false
-
 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
        if runner.ArvMountPoint == "" {
                runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
@@ -255,11 +384,13 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                return fmt.Errorf("While creating keep mount temp dir: %v", err)
        }
 
-       runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
-
        pdhOnly := true
        tmpcount := 0
-       arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
+       arvMountCmd := []string{
+               "--foreground",
+               "--allow-other",
+               "--read-write",
+               fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
 
        if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
                arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
@@ -267,20 +398,21 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        collectionPaths := []string{}
        runner.Binds = nil
+       runner.Volumes = make(map[string]struct{})
        needCertMount := true
 
        var binds []string
-       for bind, _ := range runner.Container.Mounts {
+       for bind := range runner.Container.Mounts {
                binds = append(binds, bind)
        }
        sort.Strings(binds)
 
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if bind == "stdout" {
+               if bind == "stdout" || bind == "stderr" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
-                               return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+                               return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
                        }
 
                        // Does path start with OutputPath?
@@ -289,7 +421,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                prefix += "/"
                        }
                        if !strings.HasPrefix(mnt.Path, prefix) {
-                               return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+                               return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+                       }
+               }
+
+               if bind == "stdin" {
+                       // Is it a "collection" mount kind?
+                       if mnt.Kind != "collection" && mnt.Kind != "json" {
+                               return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
                        }
                }
 
@@ -304,7 +443,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
 
                switch {
-               case mnt.Kind == "collection":
+               case mnt.Kind == "collection" && bind != "stdin":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
                                return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
@@ -319,7 +458,21 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                if mnt.Writable {
                                        return fmt.Errorf("Can never write to a collection specified by portable data hash")
                                }
+                               idx := strings.Index(mnt.PortableDataHash, "/")
+                               if idx > 0 {
+                                       mnt.Path = path.Clean(mnt.PortableDataHash[idx:])
+                                       mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
+                                       runner.Container.Mounts[bind] = mnt
+                               }
                                src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
+                               if mnt.Path != "" && mnt.Path != "." {
+                                       if strings.HasPrefix(mnt.Path, "./") {
+                                               mnt.Path = mnt.Path[2:]
+                                       } else if strings.HasPrefix(mnt.Path, "/") {
+                                               mnt.Path = mnt.Path[1:]
+                                       }
+                                       src += "/" + mnt.Path
+                               }
                        } else {
                                src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
                                arvMountCmd = append(arvMountCmd, "--mount-tmp")
@@ -338,25 +491,25 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                        collectionPaths = append(collectionPaths, src)
 
-               case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
-                       runner.HostOutputDir, err = runner.MkTempDir("", "")
+               case mnt.Kind == "tmp":
+                       var tmpdir string
+                       tmpdir, err = runner.MkTempDir("", "")
                        if err != nil {
                                return fmt.Errorf("While creating mount temp dir: %v", err)
                        }
-                       st, staterr := os.Stat(runner.HostOutputDir)
+                       st, staterr := os.Stat(tmpdir)
                        if staterr != nil {
                                return fmt.Errorf("While Stat on temp dir: %v", staterr)
                        }
-                       err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
+                       err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
                        if staterr != nil {
                                return fmt.Errorf("While Chmod temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
-                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
-                       tmpBackedOutputDir = true
-
-               case mnt.Kind == "tmp":
-                       runner.Binds = append(runner.Binds, bind)
+                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
+                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
+                       if bind == runner.Container.OutputPath {
+                               runner.HostOutputDir = tmpdir
+                       }
 
                case mnt.Kind == "json":
                        jsondata, err := json.Marshal(mnt.Content)
@@ -484,15 +637,134 @@ func (runner *ContainerRunner) StartCrunchstat() {
        runner.statReporter.Start()
 }
 
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+type infoCommand struct {
+       label string
+       cmd   []string
+}
+
+// Gather node information and store it on the log for debugging
+// purposes.
+func (runner *ContainerRunner) LogNodeInfo() (err error) {
+       w := runner.NewLogWriter("node-info")
+       logger := log.New(w, "node-info", 0)
+
+       commands := []infoCommand{
+               {
+                       label: "Host Information",
+                       cmd:   []string{"uname", "-a"},
+               },
+               {
+                       label: "CPU Information",
+                       cmd:   []string{"cat", "/proc/cpuinfo"},
+               },
+               {
+                       label: "Memory Information",
+                       cmd:   []string{"cat", "/proc/meminfo"},
+               },
+               {
+                       label: "Disk Space",
+                       cmd:   []string{"df", "-m", "/", os.TempDir()},
+               },
+               {
+                       label: "Disk INodes",
+                       cmd:   []string{"df", "-i", "/", os.TempDir()},
+               },
+       }
+
+       // Run commands with informational output to be logged.
+       var out []byte
+       for _, command := range commands {
+               out, err = exec.Command(command.cmd[0], command.cmd[1:]...).CombinedOutput()
+               if err != nil {
+                       return fmt.Errorf("While running command %q: %v",
+                               command.cmd, err)
+               }
+               logger.Println(command.label)
+               for _, line := range strings.Split(string(out), "\n") {
+                       logger.Println(" ", line)
+               }
+       }
+
+       err = w.Close()
+       if err != nil {
+               return fmt.Errorf("While closing node-info logs: %v", err)
+       }
+       return nil
+}
+
+// Get and save the raw JSON container record from the API server
+func (runner *ContainerRunner) LogContainerRecord() (err error) {
+       w := &ArvLogWriter{
+               ArvClient:     runner.ArvClient,
+               UUID:          runner.Container.UUID,
+               loggingStream: "container",
+               writeCloser:   runner.LogCollection.Open("container.json"),
+       }
+
+       // Get Container record JSON from the API Server
+       reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+       if err != nil {
+               return fmt.Errorf("While retrieving container record from the API server: %v", err)
+       }
+       defer reader.Close()
+
+       dec := json.NewDecoder(reader)
+       dec.UseNumber()
+       var cr map[string]interface{}
+       if err = dec.Decode(&cr); err != nil {
+               return fmt.Errorf("While decoding the container record JSON response: %v", err)
+       }
+       // Re-encode it using indentation to improve readability
+       enc := json.NewEncoder(w)
+       enc.SetIndent("", "    ")
+       if err = enc.Encode(cr); err != nil {
+               return fmt.Errorf("While logging the JSON container record: %v", err)
+       }
+       err = w.Close()
+       if err != nil {
+               return fmt.Errorf("While closing container.json log: %v", err)
+       }
+       return nil
+}
+
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
 func (runner *ContainerRunner) AttachStreams() (err error) {
 
        runner.CrunchLog.Print("Attaching container streams")
 
-       var containerReader io.Reader
-       containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
-               &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
+       // If stdin mount is provided, attach it to the docker container
+       var stdinRdr arvados.File
+       var stdinJson []byte
+       if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+               if stdinMnt.Kind == "collection" {
+                       var stdinColl arvados.Collection
+                       collId := stdinMnt.UUID
+                       if collId == "" {
+                               collId = stdinMnt.PortableDataHash
+                       }
+                       err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+                       if err != nil {
+                               return fmt.Errorf("While getting stding collection: %v", err)
+                       }
+
+                       stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+                       if os.IsNotExist(err) {
+                               return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+                       } else if err != nil {
+                               return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+                       }
+               } else if stdinMnt.Kind == "json" {
+                       stdinJson, err = json.Marshal(stdinMnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("While encoding stdin json data: %v", err)
+                       }
+               }
+       }
+
+       stdinUsed := stdinRdr != nil || len(stdinJson) != 0
+       response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
+               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
                return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
        }
@@ -500,37 +772,76 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        runner.loggingDone = make(chan bool)
 
        if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
-               stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
-               index := strings.LastIndex(stdoutPath, "/")
-               if index > 0 {
-                       subdirs := stdoutPath[:index]
-                       if subdirs != "" {
-                               st, err := os.Stat(runner.HostOutputDir)
-                               if err != nil {
-                                       return fmt.Errorf("While Stat on temp dir: %v", err)
-                               }
-                               stdoutPath := path.Join(runner.HostOutputDir, subdirs)
-                               err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
-                               if err != nil {
-                                       return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
-                               }
-                       }
-               }
-               stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+               stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
                if err != nil {
-                       return fmt.Errorf("While creating stdout file: %v", err)
+                       return err
                }
                runner.Stdout = stdoutFile
        } else {
                runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
        }
-       runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
 
-       go runner.ProcessDockerAttach(containerReader)
+       if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+               stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+               if err != nil {
+                       return err
+               }
+               runner.Stderr = stderrFile
+       } else {
+               runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+       }
+
+       if stdinRdr != nil {
+               go func() {
+                       _, err := io.Copy(response.Conn, stdinRdr)
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+                               runner.stop()
+                       }
+                       stdinRdr.Close()
+                       response.CloseWrite()
+               }()
+       } else if len(stdinJson) != 0 {
+               go func() {
+                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+                               runner.stop()
+                       }
+                       response.CloseWrite()
+               }()
+       }
+
+       go runner.ProcessDockerAttach(response.Reader)
 
        return nil
 }
 
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+       stdoutPath := mntPath[len(runner.Container.OutputPath):]
+       index := strings.LastIndex(stdoutPath, "/")
+       if index > 0 {
+               subdirs := stdoutPath[:index]
+               if subdirs != "" {
+                       st, err := os.Stat(runner.HostOutputDir)
+                       if err != nil {
+                               return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+                       }
+                       stdoutPath := filepath.Join(runner.HostOutputDir, subdirs)
+                       err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+                       if err != nil {
+                               return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+                       }
+               }
+       }
+       stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath))
+       if err != nil {
+               return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+       }
+
+       return stdoutFile, nil
+}
+
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer() error {
        runner.CrunchLog.Print("Creating Docker container")
@@ -543,6 +854,19 @@ func (runner *ContainerRunner) CreateContainer() error {
        for k, v := range runner.Container.Environment {
                runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
        }
+
+       runner.ContainerConfig.Volumes = runner.Volumes
+
+       runner.HostConfig = dockercontainer.HostConfig{
+               Binds: runner.Binds,
+               LogConfig: dockercontainer.LogConfig{
+                       Type: "none",
+               },
+               Resources: dockercontainer.Resources{
+                       CgroupParent: runner.setCgroupParent,
+               },
+       }
+
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
                tok, err := runner.ContainerToken()
                if err != nil {
@@ -553,24 +877,28 @@ func (runner *ContainerRunner) CreateContainer() error {
                        "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
                        "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
                )
-               runner.ContainerConfig.NetworkDisabled = false
+               runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
        } else {
-               runner.ContainerConfig.NetworkDisabled = true
+               if runner.enableNetwork == "always" {
+                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
+               } else {
+                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
+               }
        }
 
-       var err error
-       runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
+       _, stdinUsed := runner.Container.Mounts["stdin"]
+       runner.ContainerConfig.OpenStdin = stdinUsed
+       runner.ContainerConfig.StdinOnce = stdinUsed
+       runner.ContainerConfig.AttachStdin = stdinUsed
+       runner.ContainerConfig.AttachStdout = true
+       runner.ContainerConfig.AttachStderr = true
+
+       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)
        }
 
-       runner.HostConfig = dockerclient.HostConfig{
-               Binds:        runner.Binds,
-               CgroupParent: runner.setCgroupParent,
-               LogConfig: dockerclient.LogConfig{
-                       Type: "none",
-               },
-       }
+       runner.ContainerID = createdBody.ID
 
        return runner.AttachStreams()
 }
@@ -578,24 +906,55 @@ func (runner *ContainerRunner) CreateContainer() error {
 // StartContainer starts the docker container created by CreateContainer.
 func (runner *ContainerRunner) StartContainer() error {
        runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
-       err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
+       runner.cStateLock.Lock()
+       defer runner.cStateLock.Unlock()
+       if runner.cCancelled {
+               return ErrCancelled
+       }
+       err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
+               dockertypes.ContainerStartOptions{})
        if err != nil {
-               return fmt.Errorf("could not start container: %v", err)
+               var advice string
+               if strings.Contains(err.Error(), "no such file or directory") {
+                       advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
+               }
+               return fmt.Errorf("could not start container: %v%s", err, advice)
        }
+       runner.cStarted = true
        return nil
 }
 
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() error {
+func (runner *ContainerRunner) WaitFinish() (err error) {
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       result := runner.Docker.Wait(runner.ContainerID)
-       wr := <-result
-       if wr.Error != nil {
-               return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
+       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+
+       go func() {
+               <-runner.ArvMountExit
+               if runner.cStarted {
+                       runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
+                       runner.stop()
+               }
+       }()
+
+       var waitBody dockercontainer.ContainerWaitOKBody
+       select {
+       case waitBody = <-waitOk:
+       case err = <-waitErr:
+       }
+
+       // Container isn't running any more
+       runner.cStarted = false
+
+       if err != nil {
+               return fmt.Errorf("container wait: %v", err)
        }
-       runner.ExitCode = &wr.ExitCode
+
+       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+       code := int(waitBody.StatusCode)
+       runner.ExitCode = &code
 
        // wait for stdout/stderr to complete
        <-runner.loggingDone
@@ -603,6 +962,153 @@ func (runner *ContainerRunner) WaitFinish() error {
        return nil
 }
 
+var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
+
+func (runner *ContainerRunner) derefOutputSymlink(path string, startinfo os.FileInfo) (tgt string, readlinktgt string, info os.FileInfo, err error) {
+       // Follow symlinks if necessary
+       info = startinfo
+       tgt = path
+       readlinktgt = ""
+       nextlink := path
+       for followed := 0; info.Mode()&os.ModeSymlink != 0; followed++ {
+               if followed >= limitFollowSymlinks {
+                       // Got stuck in a loop or just a pathological number of links, give up.
+                       err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path)
+                       return
+               }
+
+               readlinktgt, err = os.Readlink(nextlink)
+               if err != nil {
+                       return
+               }
+
+               tgt = readlinktgt
+               if !strings.HasPrefix(tgt, "/") {
+                       // Relative symlink, resolve it to host path
+                       tgt = filepath.Join(filepath.Dir(path), tgt)
+               }
+               if strings.HasPrefix(tgt, runner.Container.OutputPath+"/") && !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
+                       // Absolute symlink to container output path, adjust it to host output path.
+                       tgt = filepath.Join(runner.HostOutputDir, tgt[len(runner.Container.OutputPath):])
+               }
+               if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
+                       // After dereferencing, symlink target must either be
+                       // within output directory, or must point to a
+                       // collection mount.
+                       err = ErrNotInOutputDir
+                       return
+               }
+
+               info, err = os.Lstat(tgt)
+               if err != nil {
+                       // tgt
+                       err = fmt.Errorf("Symlink in output %q points to invalid location %q: %v",
+                               path[len(runner.HostOutputDir):], readlinktgt, err)
+                       return
+               }
+
+               nextlink = tgt
+       }
+
+       return
+}
+
+var limitFollowSymlinks = 10
+
+// UploadFile uploads files within the output directory, with special handling
+// for symlinks. If the symlink leads to a keep mount, copy the manifest text
+// from the keep mount into the output manifestText.  Ensure that whether
+// symlinks are relative or absolute, every symlink target (even targets that
+// are symlinks themselves) must point to a path in either the output directory
+// or a collection mount.
+//
+// Assumes initial value of "path" is absolute, and located within runner.HostOutputDir.
+func (runner *ContainerRunner) UploadOutputFile(
+       path string,
+       info os.FileInfo,
+       infoerr error,
+       binds []string,
+       walkUpload *WalkUpload,
+       relocateFrom string,
+       relocateTo string,
+       followed int) (manifestText string, err error) {
+
+       if info.Mode().IsDir() {
+               return
+       }
+
+       if infoerr != nil {
+               return "", infoerr
+       }
+
+       if followed >= limitFollowSymlinks {
+               // Got stuck in a loop or just a pathological number of
+               // directory links, give up.
+               err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path)
+               return
+       }
+
+       // When following symlinks, the source path may need to be logically
+       // relocated to some other path within the output collection.  Remove
+       // the relocateFrom prefix and replace it with relocateTo.
+       relocated := relocateTo + path[len(relocateFrom):]
+
+       tgt, readlinktgt, info, derefErr := runner.derefOutputSymlink(path, info)
+       if derefErr != nil && derefErr != ErrNotInOutputDir {
+               return "", derefErr
+       }
+
+       // go through mounts and try reverse map to collection reference
+       for _, bind := range binds {
+               mnt := runner.Container.Mounts[bind]
+               if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+                       // get path relative to bind
+                       targetSuffix := tgt[len(bind):]
+
+                       // Copy mount and adjust the path to add path relative to the bind
+                       adjustedMount := mnt
+                       adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
+
+                       // Terminates in this keep mount, so add the
+                       // manifest text at appropriate location.
+                       outputSuffix := path[len(runner.HostOutputDir):]
+                       manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
+                       return
+               }
+       }
+
+       // If target is not a collection mount, it must be located within the
+       // output directory, otherwise it is an error.
+       if derefErr == ErrNotInOutputDir {
+               err = fmt.Errorf("Symlink in output %q points to invalid location %q, must point to path within the output directory.",
+                       path[len(runner.HostOutputDir):], readlinktgt)
+               return
+       }
+
+       if info.Mode().IsRegular() {
+               return "", walkUpload.UploadFile(relocated, tgt)
+       }
+
+       if info.Mode().IsDir() {
+               // Symlink leads to directory.  Walk() doesn't follow
+               // directory symlinks, so we walk the target directory
+               // instead.  Within the walk, file paths are relocated
+               // so they appear under the original symlink path.
+               err = filepath.Walk(tgt, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+                       var m string
+                       m, walkerr = runner.UploadOutputFile(walkpath, walkinfo, walkerr,
+                               binds, walkUpload, tgt, relocated, followed+1)
+                       if walkerr == nil {
+                               manifestText = manifestText + m
+                       }
+                       return walkerr
+               })
+               return
+       }
+
+       return
+}
+
 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
 func (runner *ContainerRunner) CaptureOutput() error {
        if runner.finalState != "Complete" {
@@ -633,14 +1139,42 @@ func (runner *ContainerRunner) CaptureOutput() error {
                return fmt.Errorf("While checking host output path: %v", err)
        }
 
+       // Pre-populate output from the configured mount points
+       var binds []string
+       for bind, mnt := range runner.Container.Mounts {
+               if mnt.Kind == "collection" {
+                       binds = append(binds, bind)
+               }
+       }
+       sort.Strings(binds)
+
        var manifestText string
 
        collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
        _, err = os.Stat(collectionMetafile)
        if err != nil {
                // Regular directory
-               cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
-               manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+
+               cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
+               walkUpload := cw.BeginUpload(runner.HostOutputDir, runner.CrunchLog.Logger)
+
+               var m string
+               err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
+                       m, err = runner.UploadOutputFile(path, info, err, binds, walkUpload, "", "", 0)
+                       if err == nil {
+                               manifestText = manifestText + m
+                       }
+                       return err
+               })
+
+               cw.EndUpload(walkUpload)
+
+               if err != nil {
+                       return fmt.Errorf("While uploading output files: %v", err)
+               }
+
+               m, err = cw.ManifestText()
+               manifestText = manifestText + m
                if err != nil {
                        return fmt.Errorf("While uploading output files: %v", err)
                }
@@ -660,13 +1194,6 @@ func (runner *ContainerRunner) CaptureOutput() error {
                manifestText = rec.ManifestText
        }
 
-       // Pre-populate output from the configured mount points
-       var binds []string
-       for bind, _ := range runner.Container.Mounts {
-               binds = append(binds, bind)
-       }
-       sort.Strings(binds)
-
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
 
@@ -677,30 +1204,10 @@ func (runner *ContainerRunner) CaptureOutput() error {
                        continue
                }
 
-               if strings.Index(bindSuffix, "/") != 0 {
-                       return fmt.Errorf("Expected bind to be of the format '%v/*' but found: %v", runner.Container.OutputPath, bind)
-               }
-
-               jsondata, err := json.Marshal(mnt.Content)
-               if err != nil {
-                       return fmt.Errorf("While marshal of mount content: %v", err)
-               }
-               var content map[string]interface{}
-               err = json.Unmarshal(jsondata, &content)
-               if err != nil {
-                       return fmt.Errorf("While unmarshal of mount content: %v", err)
-               }
-
-               if content["exclude_from_output"] == true {
+               if mnt.ExcludeFromOutput == true {
                        continue
                }
 
-               idx := strings.Index(mnt.PortableDataHash, "/")
-               if idx > 0 {
-                       mnt.Path = mnt.PortableDataHash[idx:]
-                       mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
-               }
-
                // append to manifest_text
                m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
                if err != nil {
@@ -712,10 +1219,13 @@ func (runner *ContainerRunner) CaptureOutput() error {
 
        // Save output
        var response arvados.Collection
+       manifest := manifest.Manifest{Text: manifestText}
+       manifestText = manifest.Extract(".", ".").Text
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
+                       "ensure_unique_name": true,
                        "collection": arvadosclient.Dict{
-                               "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "is_trashed":    true,
                                "name":          "output for " + runner.Container.UUID,
                                "manifest_text": manifestText}},
                &response)
@@ -726,6 +1236,8 @@ func (runner *ContainerRunner) CaptureOutput() error {
        return nil
 }
 
+var outputCollections = make(map[string]arvados.Collection)
+
 // Fetch the collection for the mnt.PortableDataHash
 // Return the manifest_text fragment corresponding to the specified mnt.Path
 //  after making any required updates.
@@ -743,94 +1255,79 @@ func (runner *ContainerRunner) CaptureOutput() error {
 //    "path":"/subdir1/subdir2"
 //    "path":"/subdir/filename" etc
 func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
-       var collection arvados.Collection
-       err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
-       if err != nil {
-               return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
-       }
-
-       manifestText := ""
-       if mnt.Path == "" || mnt.Path == "/" {
-               // no path specified; return the entire manifest text
-               manifestText = collection.ManifestText
-               manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1)
-               manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1)
-       } else {
-               // either a single stream or file from a stream is being sought
-               bindIdx := strings.LastIndex(bindSuffix, "/")
-               var bindSubdir, bindFileName string
-               if bindIdx >= 0 {
-                       bindSubdir = "." + bindSuffix[0:bindIdx]
-                       bindFileName = bindSuffix[bindIdx+1:]
-               }
-               pathIdx := strings.LastIndex(mnt.Path, "/")
-               var pathSubdir, pathFileName string
-               if pathIdx >= 0 {
-                       pathSubdir = "." + mnt.Path[0:pathIdx]
-                       pathFileName = mnt.Path[pathIdx+1:]
-               }
-               streams := strings.Split(collection.ManifestText, "\n")
-               for _, stream := range streams {
-                       tokens := strings.Split(stream, " ")
-                       if tokens[0] == "."+mnt.Path {
-                               // path refers to this complete stream
-                               adjustedStream := strings.Replace(stream, "."+mnt.Path, "."+bindSuffix, -1)
-                               manifestText = adjustedStream + "\n"
-                               break
-                       } else {
-                               // look for a matching file in this stream
-                               if tokens[0] == pathSubdir {
-                                       // path refers to a file in this stream
-                                       for _, token := range tokens {
-                                               if strings.Index(token, ":"+pathFileName) > 0 {
-                                                       // found the file in the stream; discard all other file tokens
-                                                       for _, t := range tokens {
-                                                               if strings.Index(t, ":") == -1 {
-                                                                       manifestText += (" " + t)
-                                                               } else {
-                                                                       break // done reading all non-file tokens
-                                                               }
-                                                       }
-                                                       manifestText = strings.Trim(manifestText, " ")
-                                                       token = strings.Replace(token, ":"+pathFileName, ":"+bindFileName, -1)
-                                                       manifestText += (" " + token + "\n")
-                                                       manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1)
-                                                       break
-                                               }
-                                       }
-                               }
-                       }
+       collection := outputCollections[mnt.PortableDataHash]
+       if collection.PortableDataHash == "" {
+               err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
+               if err != nil {
+                       return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
                }
+               outputCollections[mnt.PortableDataHash] = collection
        }
 
-       return manifestText, nil
-}
+       if collection.ManifestText == "" {
+               runner.CrunchLog.Printf("No manifest text for collection %v", collection.PortableDataHash)
+               return "", nil
+       }
 
-func (runner *ContainerRunner) loadDiscoveryVars() {
-       tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
-       if err != nil {
-               log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err)
+       mft := manifest.Manifest{Text: collection.ManifestText}
+       extracted := mft.Extract(mnt.Path, bindSuffix)
+       if extracted.Err != nil {
+               return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error())
        }
-       runner.trashLifetime = time.Duration(tl.(float64)) * time.Second
+       return extracted.Text, nil
 }
 
 func (runner *ContainerRunner) CleanupDirs() {
        if runner.ArvMount != nil {
-               umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
-               umnterr := umount.Run()
+               var delay int64 = 8
+               umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
+               umount.Stdout = runner.CrunchLog
+               umount.Stderr = runner.CrunchLog
+               runner.CrunchLog.Printf("Running %v", umount.Args)
+               umnterr := umount.Start()
+
                if umnterr != nil {
-                       runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
+                       runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
+               } else {
+                       // If arv-mount --unmount gets stuck for any reason, we
+                       // don't want to wait for it forever.  Do Wait() in a goroutine
+                       // so it doesn't block crunch-run.
+                       umountExit := make(chan error)
+                       go func() {
+                               mnterr := umount.Wait()
+                               if mnterr != nil {
+                                       runner.CrunchLog.Printf("Error unmounting: %v", mnterr)
+                               }
+                               umountExit <- mnterr
+                       }()
+
+                       for again := true; again; {
+                               again = false
+                               select {
+                               case <-umountExit:
+                                       umount = nil
+                                       again = true
+                               case <-runner.ArvMountExit:
+                                       break
+                               case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
+                                       runner.CrunchLog.Printf("Timed out waiting for unmount")
+                                       if umount != nil {
+                                               umount.Process.Kill()
+                                       }
+                                       runner.ArvMount.Process.Kill()
+                               }
+                       }
                }
+       }
 
-               mnterr := <-runner.ArvMountExit
-               if mnterr != nil {
-                       runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+       if runner.ArvMountPoint != "" {
+               if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
+                       runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
                }
        }
 
        for _, tmpdir := range runner.CleanupTempDir {
-               rmerr := os.RemoveAll(tmpdir)
-               if rmerr != nil {
+               if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
                        runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
                }
        }
@@ -839,14 +1336,19 @@ func (runner *ContainerRunner) CleanupDirs() {
 // CommitLogs posts the collection containing the final container logs.
 func (runner *ContainerRunner) CommitLogs() error {
        runner.CrunchLog.Print(runner.finalState)
+
+       if runner.arvMountLog != nil {
+               runner.arvMountLog.Close()
+       }
        runner.CrunchLog.Close()
 
-       // Closing CrunchLog above allows it to be committed to Keep at this
+       // Closing CrunchLog above allows them to be committed to Keep at this
        // point, but re-open crunch log with ArvClient in case there are any
-       // other further (such as failing to write the log to Keep!) while
-       // shutting down
-       runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
-               "crunch-run", nil})
+       // other further errors (such as failing to write the log to Keep!)
+       // while shutting down
+       runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+               UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+       runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
 
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
@@ -865,8 +1367,9 @@ func (runner *ContainerRunner) CommitLogs() error {
        var response arvados.Collection
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
+                       "ensure_unique_name": true,
                        "collection": arvadosclient.Dict{
-                               "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "is_trashed":    true,
                                "name":          "logs for " + runner.Container.UUID,
                                "manifest_text": mt}},
                &response)
@@ -879,9 +1382,9 @@ func (runner *ContainerRunner) CommitLogs() error {
 
 // UpdateContainerRunning updates the container state to "Running"
 func (runner *ContainerRunner) UpdateContainerRunning() error {
-       runner.CancelLock.Lock()
-       defer runner.CancelLock.Unlock()
-       if runner.Cancelled {
+       runner.cStateLock.Lock()
+       defer runner.cStateLock.Unlock()
+       if runner.cCancelled {
                return ErrCancelled
        }
        return runner.ArvClient.Update("containers", runner.Container.UUID,
@@ -925,14 +1428,18 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
 
 // IsCancelled returns the value of Cancelled, with goroutine safety.
 func (runner *ContainerRunner) IsCancelled() bool {
-       runner.CancelLock.Lock()
-       defer runner.CancelLock.Unlock()
-       return runner.Cancelled
+       runner.cStateLock.Lock()
+       defer runner.cStateLock.Unlock()
+       return runner.cCancelled
 }
 
 // NewArvLogWriter creates an ArvLogWriter
 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
-       return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
+       return &ArvLogWriter{
+               ArvClient:     runner.ArvClient,
+               UUID:          runner.Container.UUID,
+               loggingStream: name,
+               writeCloser:   runner.LogCollection.Open(name + ".txt")}
 }
 
 // Run the full container lifecycle.
@@ -946,12 +1453,16 @@ func (runner *ContainerRunner) Run() (err error) {
                runner.CrunchLog.Printf("Executing on host '%s'", hostname)
        }
 
-       // Clean up temporary directories _after_ finalizing
-       // everything (if we've made any by then)
-       defer runner.CleanupDirs()
-
        runner.finalState = "Queued"
 
+       defer func() {
+               runner.stopSignals()
+               runner.CleanupDirs()
+
+               runner.CrunchLog.Printf("crunch-run finished")
+               runner.CrunchLog.Close()
+       }()
+
        defer func() {
                // checkErr prints e (unless it's nil) and sets err to
                // e (unless err is already non-nil). Thus, if err
@@ -966,13 +1477,16 @@ func (runner *ContainerRunner) Run() (err error) {
                        if err == nil {
                                err = e
                        }
+                       if runner.finalState == "Complete" {
+                               // There was an error in the finalization.
+                               runner.finalState = "Cancelled"
+                       }
                }
 
                // Log the error encountered in Run(), if any
                checkErr(err)
 
                if runner.finalState == "Queued" {
-                       runner.CrunchLog.Close()
                        runner.UpdateContainerFinal()
                        return
                }
@@ -986,26 +1500,24 @@ func (runner *ContainerRunner) Run() (err error) {
                checkErr(runner.CaptureOutput())
                checkErr(runner.CommitLogs())
                checkErr(runner.UpdateContainerFinal())
-
-               // The real log is already closed, but then we opened
-               // a new one in case we needed to log anything while
-               // finalizing.
-               runner.CrunchLog.Close()
        }()
 
-       err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
+       err = runner.fetchContainerRecord()
        if err != nil {
-               err = fmt.Errorf("While getting container record: %v", err)
                return
        }
 
        // setup signal handling
-       runner.SetupSignals()
+       runner.setupSignals()
 
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
-               runner.finalState = "Cancelled"
+               if !runner.checkBrokenNode(err) {
+                       // Failed to load image but not due to a "broken node"
+                       // condition, probably user error.
+                       runner.finalState = "Cancelled"
+               }
                err = fmt.Errorf("While loading container image: %v", err)
                return
        }
@@ -1023,7 +1535,16 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       runner.StartCrunchstat()
+       // Gather and record node information
+       err = runner.LogNodeInfo()
+       if err != nil {
+               return
+       }
+       // Save container.json record on log collection
+       err = runner.LogContainerRecord()
+       if err != nil {
+               return
+       }
 
        if runner.IsCancelled() {
                return
@@ -1035,8 +1556,11 @@ func (runner *ContainerRunner) Run() (err error) {
        }
        runner.finalState = "Cancelled"
 
+       runner.StartCrunchstat()
+
        err = runner.StartContainer()
        if err != nil {
+               runner.checkBrokenNode(err)
                return
        }
 
@@ -1047,6 +1571,24 @@ func (runner *ContainerRunner) Run() (err error) {
        return
 }
 
+// Fetch the current container record (uuid = runner.Container.UUID)
+// into runner.Container.
+func (runner *ContainerRunner) fetchContainerRecord() error {
+       reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+       if err != nil {
+               return fmt.Errorf("error fetching container record: %v", err)
+       }
+       defer reader.Close()
+
+       dec := json.NewDecoder(reader)
+       dec.UseNumber()
+       err = dec.Decode(&runner.Container)
+       if err != nil {
+               return fmt.Errorf("error decoding container record: %v", err)
+       }
+       return nil
+}
+
 // NewContainerRunner creates a new container runner.
 func NewContainerRunner(api IArvadosClient,
        kc IKeepClient,
@@ -1057,11 +1599,13 @@ func NewContainerRunner(api IArvadosClient,
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
-       cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
+       cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
-       cr.loadDiscoveryVars()
+
+       loadLogThrottleParams(api)
+
        return cr
 }
 
@@ -1071,6 +1615,15 @@ func main() {
        cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
+       enableNetwork := flag.String("container-enable-networking", "default",
+               `Specify if networking should be enabled for container.  One of 'default', 'always':
+       default: only enable networking if container requests it.
+       always:  containers always have networking enabled
+       `)
+       networkMode := flag.String("container-network-mode", "default",
+               `Set networking mode for container.  Corresponds to Docker network mode (--net).
+       `)
+       memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
        flag.Parse()
 
        containerId := flag.Arg(0)
@@ -1085,32 +1638,58 @@ func main() {
        }
        api.Retries = 8
 
-       var kc *keepclient.KeepClient
-       kc, err = keepclient.MakeKeepClient(api)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
-       }
-       kc.Retries = 4
+       kc, kcerr := keepclient.MakeKeepClient(api)
 
-       var docker *dockerclient.DockerClient
-       docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
+       // API version 1.21 corresponds to Docker 1.9, which is currently the
+       // minimum version we want to support.
+       docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
+       dockerClientProxy := ThinDockerClientProxy{Docker: docker}
+
+       cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+
+       if kcerr != nil {
+               cr.CrunchLog.Printf("%s: %v", containerId, kcerr)
+               cr.CrunchLog.Close()
+               os.Exit(1)
+       }
+       if dockererr != nil {
+               cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+               cr.checkBrokenNode(dockererr)
+               cr.CrunchLog.Close()
+               os.Exit(1)
        }
 
-       cr := NewContainerRunner(api, kc, docker, containerId)
+       kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
+       kc.Retries = 4
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent
+       cr.enableNetwork = *enableNetwork
+       cr.networkMode = *networkMode
        if *cgroupParentSubsystem != "" {
                p := findCgroup(*cgroupParentSubsystem)
                cr.setCgroupParent = p
                cr.expectCgroupParent = p
        }
 
-       err = cr.Run()
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
+       runerr := cr.Run()
+
+       if *memprofile != "" {
+               f, err := os.Create(*memprofile)
+               if err != nil {
+                       log.Printf("could not create memory profile: ", err)
+               }
+               runtime.GC() // get up-to-date statistics
+               if err := pprof.WriteHeapProfile(f); err != nil {
+                       log.Printf("could not write memory profile: ", err)
+               }
+               closeerr := f.Close()
+               if closeerr != nil {
+                       log.Printf("closing memprofile file: ", err)
+               }
        }
 
+       if runerr != nil {
+               log.Fatalf("%s: %v", containerId, runerr)
+       }
 }