17813: Refactor singularity image loading / caching / conversion
[arvados.git] / lib / crunchrun / crunchrun.go
index 730185c1969f2af43b6cb76148f07541711ec451..d4a8650726d2aae9d6b222cf0a6baa89a2091dad 100644 (file)
@@ -34,11 +34,6 @@ import (
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "git.arvados.org/arvados.git/sdk/go/manifest"
        "golang.org/x/net/context"
-
-       dockertypes "github.com/docker/docker/api/types"
-       dockercontainer "github.com/docker/docker/api/types/container"
-       dockernetwork "github.com/docker/docker/api/types/network"
-       dockerclient "github.com/docker/docker/client"
 )
 
 type command struct{}
@@ -60,11 +55,12 @@ var ErrCancelled = errors.New("Cancelled")
 
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
-       PutB(buf []byte) (string, int, error)
+       BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
        ReadAt(locator string, p []byte, off int) (int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
        LocalLocator(locator string) (string, error)
        ClearBlockCache()
+       SetStorageClasses(sc []string)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -74,20 +70,6 @@ type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
 
 type MkTempDir func(string, string) (string, error)
 
-// ThinDockerClient is the minimal Docker client interface used by crunch-run.
-type ThinDockerClient interface {
-       ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
-       ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
-               networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
-       ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
-       ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
-       ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
-       ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error)
-       ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
-       ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
-       ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
-}
-
 type PsProcess interface {
        CmdlineSlice() ([]string, error)
 }
@@ -95,7 +77,10 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-       Docker ThinDockerClient
+       executor       containerExecutor
+       executorStdin  io.Closer
+       executorStdout io.Closer
+       executorStderr io.Closer
 
        // Dispatcher client is initialized with the Dispatcher token.
        // This is a privileged token used to manage container status
@@ -119,35 +104,28 @@ type ContainerRunner struct {
        ContainerArvClient  IArvadosClient
        ContainerKeepClient IKeepClient
 
-       Container       arvados.Container
-       ContainerConfig dockercontainer.Config
-       HostConfig      dockercontainer.HostConfig
-       token           string
-       ContainerID     string
-       ExitCode        *int
-       NewLogWriter    NewLogWriter
-       loggingDone     chan bool
-       CrunchLog       *ThrottledLogger
-       Stdout          io.WriteCloser
-       Stderr          io.WriteCloser
-       logUUID         string
-       logMtx          sync.Mutex
-       LogCollection   arvados.CollectionFileSystem
-       LogsPDH         *string
-       RunArvMount     RunArvMount
-       MkTempDir       MkTempDir
-       ArvMount        *exec.Cmd
-       ArvMountPoint   string
-       HostOutputDir   string
-       Binds           []string
-       Volumes         map[string]struct{}
-       OutputPDH       *string
-       SigChan         chan os.Signal
-       ArvMountExit    chan error
-       SecretMounts    map[string]arvados.Mount
-       MkArvClient     func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
-       finalState      string
-       parentTemp      string
+       Container     arvados.Container
+       token         string
+       ExitCode      *int
+       NewLogWriter  NewLogWriter
+       CrunchLog     *ThrottledLogger
+       logUUID       string
+       logMtx        sync.Mutex
+       LogCollection arvados.CollectionFileSystem
+       LogsPDH       *string
+       RunArvMount   RunArvMount
+       MkTempDir     MkTempDir
+       ArvMount      *exec.Cmd
+       ArvMountPoint string
+       HostOutputDir string
+       Volumes       map[string]struct{}
+       OutputPDH     *string
+       SigChan       chan os.Signal
+       ArvMountExit  chan error
+       SecretMounts  map[string]arvados.Mount
+       MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
+       finalState    string
+       parentTemp    string
 
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
@@ -171,17 +149,20 @@ type ContainerRunner struct {
 
        cStateLock sync.Mutex
        cCancelled bool // StopContainer() invoked
-       cRemoved   bool // docker confirmed the container no longer exists
 
-       enableNetwork string // one of "default" or "always"
-       networkMode   string // passed through to HostConfig.NetworkMode
-       arvMountLog   *ThrottledLogger
+       enableMemoryLimit bool
+       enableNetwork     string // one of "default" or "always"
+       networkMode       string // "none", "host", or "" -- passed through to executor
+       arvMountLog       *ThrottledLogger
 
        containerWatchdogInterval time.Duration
+
+       gateway Gateway
 }
 
-// setupSignals sets up signal handling to gracefully terminate the underlying
-// Docker container and update state when receiving a TERM, INT or QUIT signal.
+// setupSignals sets up signal handling to gracefully terminate the
+// underlying container and update state when receiving a TERM, INT or
+// QUIT signal.
 func (runner *ContainerRunner) setupSignals() {
        runner.SigChan = make(chan os.Signal, 1)
        signal.Notify(runner.SigChan, syscall.SIGTERM)
@@ -195,24 +176,18 @@ func (runner *ContainerRunner) setupSignals() {
        }(runner.SigChan)
 }
 
-// stop the underlying Docker container.
+// stop the underlying container.
 func (runner *ContainerRunner) stop(sig os.Signal) {
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
        if sig != nil {
                runner.CrunchLog.Printf("caught signal: %v", sig)
        }
-       if runner.ContainerID == "" {
-               return
-       }
        runner.cCancelled = true
-       runner.CrunchLog.Printf("removing container")
-       err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+       runner.CrunchLog.Printf("stopping container")
+       err := runner.executor.Stop()
        if err != nil {
-               runner.CrunchLog.Printf("error removing container: %s", err)
-       }
-       if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) {
-               runner.cRemoved = true
+               runner.CrunchLog.Printf("error stopping container: %s", err)
        }
 }
 
@@ -260,57 +235,41 @@ func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
-func (runner *ContainerRunner) LoadImage() (err error) {
-
+func (runner *ContainerRunner) LoadImage() (string, error) {
        runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
 
-       var collection arvados.Collection
-       err = runner.ContainerArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
+       d, err := os.Open(runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage)
+       if err != nil {
+               return "", err
+       }
+       defer d.Close()
+       allfiles, err := d.Readdirnames(-1)
        if err != nil {
-               return fmt.Errorf("While getting container image collection: %v", err)
+               return "", err
        }
-       manifest := manifest.Manifest{Text: collection.ManifestText}
-       var img, imageID string
-       for ms := range manifest.StreamIter() {
-               img = ms.FileStreamSegments[0].Name
-               if !strings.HasSuffix(img, ".tar") {
-                       return fmt.Errorf("First file in the container image collection does not end in .tar")
+       var tarfiles []string
+       for _, fnm := range allfiles {
+               if strings.HasSuffix(fnm, ".tar") {
+                       tarfiles = append(tarfiles, fnm)
                }
-               imageID = img[:len(img)-4]
        }
+       if len(tarfiles) == 0 {
+               return "", fmt.Errorf("image collection does not include a .tar image file")
+       }
+       if len(tarfiles) > 1 {
+               return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles)
+       }
+       imageID := tarfiles[0][:len(tarfiles[0])-4]
+       runner.CrunchLog.Printf("Using Docker image id %q", imageID)
 
-       runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
-
-       _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
+       runner.CrunchLog.Print("Loading Docker image from keep")
+       err = runner.executor.LoadImage(imageID, runner.Container, runner.ArvMountPoint,
+               runner.containerClient, runner.ContainerKeepClient)
        if err != nil {
-               runner.CrunchLog.Print("Loading Docker image from keep")
-
-               var readCloser io.ReadCloser
-               readCloser, err = runner.ContainerKeepClient.ManifestFileReader(manifest, img)
-               if err != nil {
-                       return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
-               }
-
-               response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
-               if err != nil {
-                       return fmt.Errorf("While loading container image into Docker: %v", err)
-               }
-
-               defer response.Body.Close()
-               rbody, err := ioutil.ReadAll(response.Body)
-               if err != nil {
-                       return fmt.Errorf("Reading response to image load: %v", err)
-               }
-               runner.CrunchLog.Printf("Docker response: %s", rbody)
-       } else {
-               runner.CrunchLog.Print("Docker image is available")
+               return "", err
        }
 
-       runner.ContainerConfig.Image = imageID
-
-       runner.ContainerKeepClient.ClearBlockCache()
-
-       return nil
+       return imageID, nil
 }
 
 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
@@ -332,7 +291,7 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
        }
        runner.arvMountLog = NewThrottledLogger(w)
        c.Stdout = runner.arvMountLog
-       c.Stderr = runner.arvMountLog
+       c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr)
 
        runner.CrunchLog.Printf("Running %v", c.Args)
 
@@ -416,16 +375,18 @@ func copyfile(src string, dst string) (err error) {
        return nil
 }
 
-func (runner *ContainerRunner) SetupMounts() (err error) {
-       err = runner.SetupArvMountPoint("keep")
+func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
+       bindmounts := map[string]bindmount{}
+       err := runner.SetupArvMountPoint("keep")
        if err != nil {
-               return fmt.Errorf("While creating keep mount temp dir: %v", err)
+               return nil, fmt.Errorf("While creating keep mount temp dir: %v", err)
        }
 
        token, err := runner.ContainerToken()
        if err != nil {
-               return fmt.Errorf("could not get container token: %s", err)
+               return nil, fmt.Errorf("could not get container token: %s", err)
        }
+       runner.CrunchLog.Printf("container token %q", token)
 
        pdhOnly := true
        tmpcount := 0
@@ -433,6 +394,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                "--foreground",
                "--allow-other",
                "--read-write",
+               "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
                fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
 
        if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
@@ -440,8 +402,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        }
 
        collectionPaths := []string{}
-       runner.Binds = nil
-       runner.Volumes = make(map[string]struct{})
        needCertMount := true
        type copyFile struct {
                src  string
@@ -455,11 +415,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        }
        for bind := range runner.SecretMounts {
                if _, ok := runner.Container.Mounts[bind]; ok {
-                       return fmt.Errorf("secret mount %q conflicts with regular mount", bind)
+                       return nil, fmt.Errorf("secret mount %q conflicts with regular mount", bind)
                }
                if runner.SecretMounts[bind].Kind != "json" &&
                        runner.SecretMounts[bind].Kind != "text" {
-                       return fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted",
+                       return nil, fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted",
                                bind, runner.SecretMounts[bind].Kind)
                }
                binds = append(binds, bind)
@@ -474,7 +434,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                if bind == "stdout" || bind == "stderr" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
-                               return fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind)
+                               return nil, fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind)
                        }
 
                        // Does path start with OutputPath?
@@ -483,14 +443,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                prefix += "/"
                        }
                        if !strings.HasPrefix(mnt.Path, prefix) {
-                               return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+                               return nil, fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
                        }
                }
 
                if bind == "stdin" {
                        // Is it a "collection" mount kind?
                        if mnt.Kind != "collection" && mnt.Kind != "json" {
-                               return fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind)
+                               return nil, fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind)
                        }
                }
 
@@ -500,7 +460,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
                if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
                        if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" {
-                               return fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
+                               return nil, fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
                        }
                }
 
@@ -508,17 +468,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                case mnt.Kind == "collection" && bind != "stdin":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
-                               return fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
+                               return nil, fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
                        }
                        if mnt.UUID != "" {
                                if mnt.Writable {
-                                       return fmt.Errorf("writing to existing collections currently not permitted")
+                                       return nil, fmt.Errorf("writing to existing collections currently not permitted")
                                }
                                pdhOnly = false
                                src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
                        } else if mnt.PortableDataHash != "" {
                                if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
-                                       return fmt.Errorf("can never write to a collection specified by portable data hash")
+                                       return nil, fmt.Errorf("can never write to a collection specified by portable data hash")
                                }
                                idx := strings.Index(mnt.PortableDataHash, "/")
                                if idx > 0 {
@@ -544,14 +504,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if mnt.Writable {
                                if bind == runner.Container.OutputPath {
                                        runner.HostOutputDir = src
-                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
+                                       bindmounts[bind] = bindmount{HostPath: src}
                                } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
                                        copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
                                } else {
-                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
+                                       bindmounts[bind] = bindmount{HostPath: src}
                                }
                        } else {
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
+                               bindmounts[bind] = bindmount{HostPath: src, ReadOnly: true}
                        }
                        collectionPaths = append(collectionPaths, src)
 
@@ -559,17 +519,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        var tmpdir string
                        tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
                        if err != nil {
-                               return fmt.Errorf("while creating mount temp dir: %v", err)
+                               return nil, fmt.Errorf("while creating mount temp dir: %v", err)
                        }
                        st, staterr := os.Stat(tmpdir)
                        if staterr != nil {
-                               return fmt.Errorf("while Stat on temp dir: %v", staterr)
+                               return nil, fmt.Errorf("while Stat on temp dir: %v", staterr)
                        }
                        err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
                        if staterr != nil {
-                               return fmt.Errorf("while Chmod temp dir: %v", err)
+                               return nil, fmt.Errorf("while Chmod temp dir: %v", err)
                        }
-                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
+                       bindmounts[bind] = bindmount{HostPath: tmpdir}
                        if bind == runner.Container.OutputPath {
                                runner.HostOutputDir = tmpdir
                        }
@@ -579,53 +539,53 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if mnt.Kind == "json" {
                                filedata, err = json.Marshal(mnt.Content)
                                if err != nil {
-                                       return fmt.Errorf("encoding json data: %v", err)
+                                       return nil, fmt.Errorf("encoding json data: %v", err)
                                }
                        } else {
                                text, ok := mnt.Content.(string)
                                if !ok {
-                                       return fmt.Errorf("content for mount %q must be a string", bind)
+                                       return nil, fmt.Errorf("content for mount %q must be a string", bind)
                                }
                                filedata = []byte(text)
                        }
 
                        tmpdir, err := runner.MkTempDir(runner.parentTemp, mnt.Kind)
                        if err != nil {
-                               return fmt.Errorf("creating temp dir: %v", err)
+                               return nil, fmt.Errorf("creating temp dir: %v", err)
                        }
                        tmpfn := filepath.Join(tmpdir, "mountdata."+mnt.Kind)
                        err = ioutil.WriteFile(tmpfn, filedata, 0444)
                        if err != nil {
-                               return fmt.Errorf("writing temp file: %v", err)
+                               return nil, fmt.Errorf("writing temp file: %v", err)
                        }
                        if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
                                copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
                        } else {
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
+                               bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
                        }
 
                case mnt.Kind == "git_tree":
                        tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
                        if err != nil {
-                               return fmt.Errorf("creating temp dir: %v", err)
+                               return nil, fmt.Errorf("creating temp dir: %v", err)
                        }
                        err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
                        if err != nil {
-                               return err
+                               return nil, err
                        }
-                       runner.Binds = append(runner.Binds, tmpdir+":"+bind+":ro")
+                       bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true}
                }
        }
 
        if runner.HostOutputDir == "" {
-               return fmt.Errorf("output path does not correspond to a writable mount point")
+               return nil, fmt.Errorf("output path does not correspond to a writable mount point")
        }
 
        if needCertMount && runner.Container.RuntimeConstraints.API {
                for _, certfile := range arvadosclient.CertFiles {
                        _, err := os.Stat(certfile)
                        if err == nil {
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile))
+                               bindmounts["/etc/arvados/ca-certificates.crt"] = bindmount{HostPath: certfile, ReadOnly: true}
                                break
                        }
                }
@@ -636,24 +596,25 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        } else {
                arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
        }
+       arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
        arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
 
        runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
        if err != nil {
-               return fmt.Errorf("while trying to start arv-mount: %v", err)
+               return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
        }
 
        for _, p := range collectionPaths {
                _, err = os.Stat(p)
                if err != nil {
-                       return fmt.Errorf("while checking that input files exist: %v", err)
+                       return nil, fmt.Errorf("while checking that input files exist: %v", err)
                }
        }
 
        for _, cp := range copyFiles {
                st, err := os.Stat(cp.src)
                if err != nil {
-                       return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+                       return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
                }
                if st.IsDir() {
                        err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
@@ -684,59 +645,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                }
                if err != nil {
-                       return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+                       return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
                }
        }
 
-       return nil
-}
-
-func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
-       // Handle docker log protocol
-       // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
-       defer close(runner.loggingDone)
-
-       header := make([]byte, 8)
-       var err error
-       for err == nil {
-               _, err = io.ReadAtLeast(containerReader, header, 8)
-               if err != nil {
-                       if err == io.EOF {
-                               err = nil
-                       }
-                       break
-               }
-               readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
-               if header[0] == 1 {
-                       // stdout
-                       _, err = io.CopyN(runner.Stdout, containerReader, readsize)
-               } else {
-                       // stderr
-                       _, err = io.CopyN(runner.Stderr, containerReader, readsize)
-               }
-       }
-
-       if err != nil {
-               runner.CrunchLog.Printf("error reading docker logs: %v", err)
-       }
-
-       err = runner.Stdout.Close()
-       if err != nil {
-               runner.CrunchLog.Printf("error closing stdout logs: %v", err)
-       }
-
-       err = runner.Stderr.Close()
-       if err != nil {
-               runner.CrunchLog.Printf("error closing stderr logs: %v", err)
-       }
-
-       if runner.statReporter != nil {
-               runner.statReporter.Stop()
-               err = runner.statLogger.Close()
-               if err != nil {
-                       runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
-               }
-       }
+       return bindmounts, nil
 }
 
 func (runner *ContainerRunner) stopHoststat() error {
@@ -773,7 +686,7 @@ func (runner *ContainerRunner) startCrunchstat() error {
        }
        runner.statLogger = NewThrottledLogger(w)
        runner.statReporter = &crunchstat.Reporter{
-               CID:          runner.ContainerID,
+               CID:          runner.executor.CgroupID(),
                Logger:       log.New(runner.statLogger, "", 0),
                CgroupParent: runner.expectCgroupParent,
                CgroupRoot:   runner.cgroupRoot,
@@ -936,102 +849,6 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
        return true, nil
 }
 
-// AttachStreams connects the docker container stdin, stdout and stderr logs
-// to the Arvados logger which logs to Keep and the API server logs table.
-func (runner *ContainerRunner) AttachStreams() (err error) {
-
-       runner.CrunchLog.Print("Attaching container streams")
-
-       // If stdin mount is provided, attach it to the docker container
-       var stdinRdr arvados.File
-       var stdinJSON []byte
-       if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
-               if stdinMnt.Kind == "collection" {
-                       var stdinColl arvados.Collection
-                       collID := stdinMnt.UUID
-                       if collID == "" {
-                               collID = stdinMnt.PortableDataHash
-                       }
-                       err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl)
-                       if err != nil {
-                               return fmt.Errorf("While getting stdin collection: %v", err)
-                       }
-
-                       stdinRdr, err = runner.ContainerKeepClient.ManifestFileReader(
-                               manifest.Manifest{Text: stdinColl.ManifestText},
-                               stdinMnt.Path)
-                       if os.IsNotExist(err) {
-                               return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
-                       } else if err != nil {
-                               return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
-                       }
-               } else if stdinMnt.Kind == "json" {
-                       stdinJSON, err = json.Marshal(stdinMnt.Content)
-                       if err != nil {
-                               return fmt.Errorf("While encoding stdin json data: %v", err)
-                       }
-               }
-       }
-
-       stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
-       response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
-       if err != nil {
-               return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
-       }
-
-       runner.loggingDone = make(chan bool)
-
-       if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
-               stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
-               if err != nil {
-                       return err
-               }
-               runner.Stdout = stdoutFile
-       } else if w, err := runner.NewLogWriter("stdout"); err != nil {
-               return err
-       } else {
-               runner.Stdout = NewThrottledLogger(w)
-       }
-
-       if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
-               stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
-               if err != nil {
-                       return err
-               }
-               runner.Stderr = stderrFile
-       } else if w, err := runner.NewLogWriter("stderr"); err != nil {
-               return err
-       } else {
-               runner.Stderr = NewThrottledLogger(w)
-       }
-
-       if stdinRdr != nil {
-               go func() {
-                       _, err := io.Copy(response.Conn, stdinRdr)
-                       if err != nil {
-                               runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err)
-                               runner.stop(nil)
-                       }
-                       stdinRdr.Close()
-                       response.CloseWrite()
-               }()
-       } else if len(stdinJSON) != 0 {
-               go func() {
-                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON))
-                       if err != nil {
-                               runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
-                               runner.stop(nil)
-                       }
-                       response.CloseWrite()
-               }()
-       }
-
-       go runner.ProcessDockerAttach(response.Reader)
-
-       return nil
-}
-
 func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
        stdoutPath := mntPath[len(runner.Container.OutputPath):]
        index := strings.LastIndex(stdoutPath, "/")
@@ -1058,86 +875,113 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
 }
 
 // CreateContainer creates the docker container.
-func (runner *ContainerRunner) CreateContainer() error {
-       runner.CrunchLog.Print("Creating Docker container")
-
-       runner.ContainerConfig.Cmd = runner.Container.Command
-       if runner.Container.Cwd != "." {
-               runner.ContainerConfig.WorkingDir = runner.Container.Cwd
+func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
+       var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
+       if mnt, ok := runner.Container.Mounts["stdin"]; ok {
+               switch mnt.Kind {
+               case "collection":
+                       var collID string
+                       if mnt.UUID != "" {
+                               collID = mnt.UUID
+                       } else {
+                               collID = mnt.PortableDataHash
+                       }
+                       path := runner.ArvMountPoint + "/by_id/" + collID + "/" + mnt.Path
+                       f, err := os.Open(path)
+                       if err != nil {
+                               return err
+                       }
+                       stdin = f
+               case "json":
+                       j, err := json.Marshal(mnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("error encoding stdin json data: %v", err)
+                       }
+                       stdin = ioutil.NopCloser(bytes.NewReader(j))
+               default:
+                       return fmt.Errorf("stdin mount has unsupported kind %q", mnt.Kind)
+               }
        }
 
-       for k, v := range runner.Container.Environment {
-               runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
+       var stdout, stderr io.WriteCloser
+       if mnt, ok := runner.Container.Mounts["stdout"]; ok {
+               f, err := runner.getStdoutFile(mnt.Path)
+               if err != nil {
+                       return err
+               }
+               stdout = f
+       } else if w, err := runner.NewLogWriter("stdout"); err != nil {
+               return err
+       } else {
+               stdout = NewThrottledLogger(w)
        }
 
-       runner.ContainerConfig.Volumes = runner.Volumes
-
-       maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
-       minDockerRAM := int64(16)
-       if maxRAM < minDockerRAM*1024*1024 {
-               // Docker daemon won't let you set a limit less than ~10 MiB
-               maxRAM = minDockerRAM * 1024 * 1024
-       }
-       runner.HostConfig = dockercontainer.HostConfig{
-               Binds: runner.Binds,
-               LogConfig: dockercontainer.LogConfig{
-                       Type: "none",
-               },
-               Resources: dockercontainer.Resources{
-                       CgroupParent: runner.setCgroupParent,
-                       NanoCPUs:     int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000,
-                       Memory:       maxRAM, // RAM
-                       MemorySwap:   maxRAM, // RAM+swap
-                       KernelMemory: maxRAM, // kernel portion
-               },
+       if mnt, ok := runner.Container.Mounts["stderr"]; ok {
+               f, err := runner.getStdoutFile(mnt.Path)
+               if err != nil {
+                       return err
+               }
+               stderr = f
+       } else if w, err := runner.NewLogWriter("stderr"); err != nil {
+               return err
+       } else {
+               stderr = NewThrottledLogger(w)
        }
 
+       env := runner.Container.Environment
+       enableNetwork := runner.enableNetwork == "always"
        if runner.Container.RuntimeConstraints.API {
+               enableNetwork = true
                tok, err := runner.ContainerToken()
                if err != nil {
                        return err
                }
-               runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
-                       "ARVADOS_API_TOKEN="+tok,
-                       "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
-                       "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
-               )
-               runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
-       } else {
-               if runner.enableNetwork == "always" {
-                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
-               } else {
-                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
-               }
-       }
-
-       _, stdinUsed := runner.Container.Mounts["stdin"]
-       runner.ContainerConfig.OpenStdin = stdinUsed
-       runner.ContainerConfig.StdinOnce = stdinUsed
-       runner.ContainerConfig.AttachStdin = stdinUsed
-       runner.ContainerConfig.AttachStdout = true
-       runner.ContainerConfig.AttachStderr = true
-
-       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
-       if err != nil {
-               return fmt.Errorf("While creating container: %v", err)
-       }
-
-       runner.ContainerID = createdBody.ID
-
-       return runner.AttachStreams()
+               env = map[string]string{}
+               for k, v := range runner.Container.Environment {
+                       env[k] = v
+               }
+               env["ARVADOS_API_TOKEN"] = tok
+               env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST")
+               env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE")
+       }
+       workdir := runner.Container.Cwd
+       if workdir == "." {
+               // both "" and "." mean default
+               workdir = ""
+       }
+       ram := runner.Container.RuntimeConstraints.RAM
+       if !runner.enableMemoryLimit {
+               ram = 0
+       }
+       runner.executorStdin = stdin
+       runner.executorStdout = stdout
+       runner.executorStderr = stderr
+       return runner.executor.Create(containerSpec{
+               Image:         imageID,
+               VCPUs:         runner.Container.RuntimeConstraints.VCPUs,
+               RAM:           ram,
+               WorkingDir:    workdir,
+               Env:           env,
+               BindMounts:    bindmounts,
+               Command:       runner.Container.Command,
+               EnableNetwork: enableNetwork,
+               NetworkMode:   runner.networkMode,
+               CgroupParent:  runner.setCgroupParent,
+               Stdin:         stdin,
+               Stdout:        stdout,
+               Stderr:        stderr,
+       })
 }
 
 // StartContainer starts the docker container created by CreateContainer.
 func (runner *ContainerRunner) StartContainer() error {
-       runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
+       runner.CrunchLog.Printf("Starting container")
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
        if runner.cCancelled {
                return ErrCancelled
        }
-       err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerStartOptions{})
+       err := runner.executor.Start()
        if err != nil {
                var advice string
                if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
@@ -1151,71 +995,60 @@ func (runner *ContainerRunner) StartContainer() error {
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
 func (runner *ContainerRunner) WaitFinish() error {
-       var runTimeExceeded <-chan time.Time
        runner.CrunchLog.Print("Waiting for container to finish")
-
-       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
-       arvMountExit := runner.ArvMountExit
-       if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
-               runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
+       var timeout <-chan time.Time
+       if s := runner.Container.SchedulingParameters.MaxRunTime; s > 0 {
+               timeout = time.After(time.Duration(s) * time.Second)
        }
-
-       containerGone := make(chan struct{})
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
        go func() {
-               defer close(containerGone)
-               if runner.containerWatchdogInterval < 1 {
-                       runner.containerWatchdogInterval = time.Minute
-               }
-               for range time.NewTicker(runner.containerWatchdogInterval).C {
-                       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
-                       ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
-                       cancel()
-                       runner.cStateLock.Lock()
-                       done := runner.cRemoved || runner.ExitCode != nil
-                       runner.cStateLock.Unlock()
-                       if done {
-                               return
-                       } else if err != nil {
-                               runner.CrunchLog.Printf("Error inspecting container: %s", err)
-                               runner.checkBrokenNode(err)
-                               return
-                       } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
-                               runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State)
-                               return
-                       }
-               }
-       }()
-
-       for {
                select {
-               case waitBody := <-waitOk:
-                       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
-                       code := int(waitBody.StatusCode)
-                       runner.ExitCode = &code
-
-                       // wait for stdout/stderr to complete
-                       <-runner.loggingDone
-                       return nil
-
-               case err := <-waitErr:
-                       return fmt.Errorf("container wait: %v", err)
-
-               case <-arvMountExit:
-                       runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
-                       runner.stop(nil)
-                       // arvMountExit will always be ready now that
-                       // it's closed, but that doesn't interest us.
-                       arvMountExit = nil
-
-               case <-runTimeExceeded:
+               case <-timeout:
                        runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.")
                        runner.stop(nil)
-                       runTimeExceeded = nil
+               case <-runner.ArvMountExit:
+                       runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
+                       runner.stop(nil)
+               case <-ctx.Done():
+               }
+       }()
+       exitcode, err := runner.executor.Wait(ctx)
+       if err != nil {
+               runner.checkBrokenNode(err)
+               return err
+       }
+       runner.ExitCode = &exitcode
 
-               case <-containerGone:
-                       return errors.New("docker client never returned status")
+       var returnErr error
+       if err = runner.executorStdin.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdin: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               returnErr = err
+       }
+       if err = runner.executorStdout.Close(); err != nil {
+               err = fmt.Errorf("error closing container stdout: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+       if err = runner.executorStderr.Close(); err != nil {
+               err = fmt.Errorf("error closing container stderr: %s", err)
+               runner.CrunchLog.Printf("%s", err)
+               if returnErr == nil {
+                       returnErr = err
+               }
+       }
+
+       if runner.statReporter != nil {
+               runner.statReporter.Stop()
+               err = runner.statLogger.Close()
+               if err != nil {
+                       runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
+       return returnErr
 }
 
 func (runner *ContainerRunner) updateLogs() {
@@ -1268,7 +1101,7 @@ func (runner *ContainerRunner) updateLogs() {
 
 // CaptureOutput saves data from the container's output directory if
 // needed, and updates the container output accordingly.
-func (runner *ContainerRunner) CaptureOutput() error {
+func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error {
        if runner.Container.RuntimeConstraints.API {
                // Output may have been set directly by the container, so
                // refresh the container record to check.
@@ -1290,7 +1123,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
                keepClient:    runner.ContainerKeepClient,
                hostOutputDir: runner.HostOutputDir,
                ctrOutputDir:  runner.Container.OutputPath,
-               binds:         runner.Binds,
+               bindmounts:    bindmounts,
                mounts:        runner.Container.Mounts,
                secretMounts:  runner.SecretMounts,
                logger:        runner.CrunchLog,
@@ -1366,12 +1199,14 @@ func (runner *ContainerRunner) CleanupDirs() {
                                }
                        }
                }
+               runner.ArvMount = nil
        }
 
        if runner.ArvMountPoint != "" {
                if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
                        runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
                }
+               runner.ArvMountPoint = ""
        }
 
        if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
@@ -1431,15 +1266,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
                // Already finalized.
                return
        }
-       mt, err := runner.LogCollection.MarshalManifest(".")
-       if err != nil {
-               err = fmt.Errorf("error creating log manifest: %v", err)
-               return
-       }
        updates := arvadosclient.Dict{
-               "name":          "logs for " + runner.Container.UUID,
-               "manifest_text": mt,
+               "name": "logs for " + runner.Container.UUID,
+       }
+       mt, err1 := runner.LogCollection.MarshalManifest(".")
+       if err1 == nil {
+               // Only send updated manifest text if there was no
+               // error.
+               updates["manifest_text"] = mt
        }
+
+       // Even if flushing the manifest had an error, we still want
+       // to update the log record, if possible, to push the trash_at
+       // and delete_at times into the future.  Details on bug
+       // #17293.
        if final {
                updates["is_trashed"] = true
        } else {
@@ -1448,16 +1288,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
                updates["delete_at"] = exp
        }
        reqBody := arvadosclient.Dict{"collection": updates}
+       var err2 error
        if runner.logUUID == "" {
                reqBody["ensure_unique_name"] = true
-               err = runner.DispatcherArvClient.Create("collections", reqBody, &response)
+               err2 = runner.DispatcherArvClient.Create("collections", reqBody, &response)
        } else {
-               err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
+               err2 = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
        }
-       if err != nil {
-               return
+       if err2 == nil {
+               runner.logUUID = response.UUID
+       }
+
+       if err1 != nil || err2 != nil {
+               err = fmt.Errorf("error recording logs: %q, %q", err1, err2)
        }
-       runner.logUUID = response.UUID
        return
 }
 
@@ -1469,7 +1313,7 @@ func (runner *ContainerRunner) UpdateContainerRunning() error {
                return ErrCancelled
        }
        return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
-               arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
+               arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil)
 }
 
 // ContainerToken returns the api_token the container (and any
@@ -1557,6 +1401,7 @@ func (runner *ContainerRunner) Run() (err error) {
                return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State)
        }
 
+       var bindmounts map[string]bindmount
        defer func() {
                // checkErr prints e (unless it's nil) and sets err to
                // e (unless err is already non-nil). Thus, if err
@@ -1591,9 +1436,12 @@ func (runner *ContainerRunner) Run() (err error) {
                        // capture partial output and write logs
                }
 
-               checkErr("CaptureOutput", runner.CaptureOutput())
+               if bindmounts != nil {
+                       checkErr("CaptureOutput", runner.CaptureOutput(bindmounts))
+               }
                checkErr("stopHoststat", runner.stopHoststat())
                checkErr("CommitLogs", runner.CommitLogs())
+               runner.CleanupDirs()
                checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
        }()
 
@@ -1603,8 +1451,16 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
+       // set up FUSE mount and binds
+       bindmounts, err = runner.SetupMounts()
+       if err != nil {
+               runner.finalState = "Cancelled"
+               err = fmt.Errorf("While setting up mounts: %v", err)
+               return
+       }
+
        // check for and/or load image
-       err = runner.LoadImage()
+       imageID, err := runner.LoadImage()
        if err != nil {
                if !runner.checkBrokenNode(err) {
                        // Failed to load image but not due to a "broken node"
@@ -1615,15 +1471,7 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       // set up FUSE mount and binds
-       err = runner.SetupMounts()
-       if err != nil {
-               runner.finalState = "Cancelled"
-               err = fmt.Errorf("While setting up mounts: %v", err)
-               return
-       }
-
-       err = runner.CreateContainer()
+       err = runner.CreateContainer(imageID, bindmounts)
        if err != nil {
                return
        }
@@ -1699,6 +1547,9 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
                return fmt.Errorf("error creating container API client: %v", err)
        }
 
+       runner.ContainerKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+       runner.DispatcherKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+
        err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
        if err != nil {
                if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
@@ -1716,14 +1567,12 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
 func NewContainerRunner(dispatcherClient *arvados.Client,
        dispatcherArvClient IArvadosClient,
        dispatcherKeepClient IKeepClient,
-       docker ThinDockerClient,
        containerUUID string) (*ContainerRunner, error) {
 
        cr := &ContainerRunner{
                dispatcherClient:     dispatcherClient,
                DispatcherArvClient:  dispatcherArvClient,
                DispatcherKeepClient: dispatcherKeepClient,
-               Docker:               docker,
        }
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
@@ -1773,15 +1622,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
-       enableNetwork := flags.String("container-enable-networking", "default",
-               `Specify if networking should be enabled for container.  One of 'default', 'always':
-       default: only enable networking if container requests it.
-       always:  containers always have networking enabled
-       `)
-       networkMode := flags.String("container-network-mode", "default",
-               `Set networking mode for container.  Corresponds to Docker network mode (--net).
-       `)
+       enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
+       enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
+       networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
        memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
+       runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
 
        ignoreDetachFlag := false
@@ -1814,18 +1659,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                }
        }
 
-       containerID := flags.Arg(0)
+       containerUUID := flags.Arg(0)
 
        switch {
        case *detach && !ignoreDetachFlag:
-               return Detach(containerID, prog, args, os.Stdout, os.Stderr)
+               return Detach(containerUUID, prog, args, os.Stdout, os.Stderr)
        case *kill >= 0:
-               return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr)
+               return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
        case *list:
                return ListProcesses(os.Stdout, os.Stderr)
        }
 
-       if containerID == "" {
+       if containerUUID == "" {
                log.Printf("usage: %s [options] UUID", prog)
                return 1
        }
@@ -1839,38 +1684,72 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               log.Printf("%s: %v", containerID, err)
+               log.Printf("%s: %v", containerUUID, err)
                return 1
        }
        api.Retries = 8
 
        kc, kcerr := keepclient.MakeKeepClient(api)
        if kcerr != nil {
-               log.Printf("%s: %v", containerID, kcerr)
+               log.Printf("%s: %v", containerUUID, kcerr)
                return 1
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
-       // API version 1.21 corresponds to Docker 1.9, which is currently the
-       // minimum version we want to support.
-       docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-
-       cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
+       cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID)
        if err != nil {
                log.Print(err)
                return 1
        }
-       if dockererr != nil {
-               cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
-               cr.checkBrokenNode(dockererr)
+
+       switch *runtimeEngine {
+       case "docker":
+               cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
+       case "singularity":
+               cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf)
+       default:
+               cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine)
+               cr.CrunchLog.Close()
+               return 1
+       }
+       if err != nil {
+               cr.CrunchLog.Printf("%s: %v", containerUUID, err)
+               cr.checkBrokenNode(err)
                cr.CrunchLog.Close()
                return 1
        }
+       defer cr.executor.Close()
+
+       gwAuthSecret := os.Getenv("GatewayAuthSecret")
+       os.Unsetenv("GatewayAuthSecret")
+       if gwAuthSecret == "" {
+               // not safe to run a gateway service without an auth
+               // secret
+               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
+       } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
+               // dispatcher did not tell us which external IP
+               // address to advertise --> no gateway service
+               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
+       } else if de, ok := cr.executor.(*dockerExecutor); ok {
+               cr.gateway = Gateway{
+                       Address:            gwListen,
+                       AuthSecret:         gwAuthSecret,
+                       ContainerUUID:      containerUUID,
+                       DockerContainerID:  &de.containerID,
+                       Log:                cr.CrunchLog,
+                       ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+               }
+               err = cr.gateway.Start()
+               if err != nil {
+                       log.Printf("error starting gateway server: %s", err)
+                       return 1
+               }
+       }
 
-       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".")
+       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerUUID+".")
        if tmperr != nil {
-               log.Printf("%s: %v", containerID, tmperr)
+               log.Printf("%s: %v", containerUUID, tmperr)
                return 1
        }
 
@@ -1878,6 +1757,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent
+       cr.enableMemoryLimit = *enableMemoryLimit
        cr.enableNetwork = *enableNetwork
        cr.networkMode = *networkMode
        if *cgroupParentSubsystem != "" {
@@ -1904,7 +1784,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
 
        if runerr != nil {
-               log.Printf("%s: %v", containerID, runerr)
+               log.Printf("%s: %v", containerUUID, runerr)
                return 1
        }
        return 0