17296: Merge branch 'master'
authorTom Clegg <tom@curii.com>
Tue, 25 May 2021 21:05:20 +0000 (17:05 -0400)
committerTom Clegg <tom@curii.com>
Tue, 25 May 2021 21:05:20 +0000 (17:05 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

21 files changed:
doc/admin/upgrading.html.textile.liquid
lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/crunchrun/copier.go
lib/crunchrun/copier_test.go
lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
lib/crunchrun/docker.go [new file with mode: 0644]
lib/crunchrun/docker_test.go [new file with mode: 0644]
lib/crunchrun/executor.go [new file with mode: 0644]
lib/crunchrun/executor_test.go [new file with mode: 0644]
lib/crunchrun/integration_test.go [new file with mode: 0644]
lib/crunchrun/logging_test.go
lib/crunchrun/singularity.go [new file with mode: 0644]
lib/crunchrun/singularity_test.go [new file with mode: 0644]
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/worker/pool.go
lib/install/deps.go
sdk/go/arvados/config.go
sdk/go/arvadostest/fixtures.go

index 6d1736fb56eb28ca33d03c8b9e45b24b3cac5ca1..6386aa96ec56c6c92eb7750f8cfc79b9ffb19a2e 100644 (file)
@@ -39,6 +39,10 @@ h2(#main). development main (as of 2020-12-10)
 
 "Upgrading from 2.1.0":#v2_1_0
 
+h3. Multi-file docker image collections
+
+Typically a docker image collection contains a single @.tar@ file at the top level. Handling of atypical cases has changed. If a docker image collection contains files with extensions other than @.tar@, they will be ignored (previously they could cause errors). If a docker image collection contains multiple @.tar@ files, it will cause an error at runtime, "cannot choose from multiple tar files in image collection" (previously one of the @.tar@ files was selected). Subdirectories are ignored. The @arv keep docker@ command always creates a collection with a single @.tar@ file, and never uses subdirectories, so this change will not affect most users.
+
 h3. New spelling of S3 credential configs
 
 If you use the S3 driver for Keep volumes and specify credentials in your configuration file (as opposed to using an IAM role), you should change the spelling of the @AccessKey@ and @SecretKey@ config keys to @AccessKeyID@ and @SecretAccessKey@. If you don't update them, the previous spellings will still be accepted, but warnings will be logged at server startup.
index 8ad2cb53fca8d20fce9a091f5a6b781e7e8c9835..e24084ca07382bbab44fbdba0b19f4f1ef9e7208 100644 (file)
@@ -879,6 +879,9 @@ Clusters:
       # Minimum time between two attempts to run the same container
       MinRetryPeriod: 0s
 
+      # Container runtime: "docker" (default) or "singularity" (experimental)
+      RuntimeEngine: docker
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
index 890d4ce4711eb4f06a00ada8e9e5adc63b2d7999..bd88c5d013af8df2546116e65a013d397ced4e6c 100644 (file)
@@ -122,6 +122,7 @@ var whitelist = map[string]bool{
        "Containers.MaxRetryAttempts":                         true,
        "Containers.MinRetryPeriod":                           true,
        "Containers.ReserveExtraRAM":                          true,
+       "Containers.RuntimeEngine":                            true,
        "Containers.ShellAccess":                              true,
        "Containers.ShellAccess.Admin":                        true,
        "Containers.ShellAccess.User":                         true,
index 9e59f8c9238606ed8b0926ad2841ce66d20e3565..9f4bf011b8dbf02cf19ec4c61cc919072baf1d08 100644 (file)
@@ -885,6 +885,9 @@ Clusters:
       # Minimum time between two attempts to run the same container
       MinRetryPeriod: 0s
 
+      # Container runtime: "docker" (default) or "singularity" (experimental)
+      RuntimeEngine: docker
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
index 132101028ea4d6d5b6b8a76df5238d7ceb0effb7..72c714dfa4ef47bbe4d60adff4693edd97e7b7cb 100644 (file)
@@ -55,7 +55,7 @@ type copier struct {
        keepClient    IKeepClient
        hostOutputDir string
        ctrOutputDir  string
-       binds         []string
+       bindmounts    map[string]bindmount
        mounts        map[string]arvados.Mount
        secretMounts  map[string]arvados.Mount
        logger        printfer
@@ -341,11 +341,8 @@ func (cp *copier) hostRoot(ctrRoot string) (string, error) {
        if ctrRoot == cp.ctrOutputDir {
                return cp.hostOutputDir, nil
        }
-       for _, bind := range cp.binds {
-               tokens := strings.Split(bind, ":")
-               if len(tokens) >= 2 && tokens[1] == ctrRoot {
-                       return tokens[0], nil
-               }
+       if mnt, ok := cp.bindmounts[ctrRoot]; ok {
+               return mnt.HostPath, nil
        }
        return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
 }
index 07fd795efe45a75c6390520a88de39e479ca1f72..5e92490163f6e34bc935eae42d2002fdea74436f 100644 (file)
@@ -132,7 +132,9 @@ func (s *copierSuite) TestSymlinkToMountedCollection(c *check.C) {
                PortableDataHash: arvadostest.FooCollectionPDH,
                Writable:         true,
        }
-       s.cp.binds = append(s.cp.binds, bindtmp+":/mnt-w")
+       s.cp.bindmounts = map[string]bindmount{
+               "/mnt-w": bindmount{HostPath: bindtmp, ReadOnly: false},
+       }
 
        c.Assert(os.Symlink("../../mnt", s.cp.hostOutputDir+"/l_dir"), check.IsNil)
        c.Assert(os.Symlink("/mnt/foo", s.cp.hostOutputDir+"/l_file"), check.IsNil)
index 88c137277cfde57ccdab722212e9c3bf2d4ba310..5638e81e4de6670673dc2163577768323919d551 100644 (file)
@@ -34,11 +34,6 @@ import (
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "git.arvados.org/arvados.git/sdk/go/manifest"
        "golang.org/x/net/context"
-
-       dockertypes "github.com/docker/docker/api/types"
-       dockercontainer "github.com/docker/docker/api/types/container"
-       dockernetwork "github.com/docker/docker/api/types/network"
-       dockerclient "github.com/docker/docker/client"
 )
 
 type command struct{}
@@ -74,20 +69,6 @@ type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
 
 type MkTempDir func(string, string) (string, error)
 
-// ThinDockerClient is the minimal Docker client interface used by crunch-run.
-type ThinDockerClient interface {
-       ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
-       ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
-               networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
-       ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
-       ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
-       ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
-       ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error)
-       ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
-       ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
-       ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
-}
-
 type PsProcess interface {
        CmdlineSlice() ([]string, error)
 }
@@ -95,7 +76,7 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-       Docker ThinDockerClient
+       executor containerExecutor
 
        // Dispatcher client is initialized with the Dispatcher token.
        // This is a privileged token used to manage container status
@@ -119,35 +100,30 @@ type ContainerRunner struct {
        ContainerArvClient  IArvadosClient
        ContainerKeepClient IKeepClient
 
-       Container       arvados.Container
-       ContainerConfig dockercontainer.Config
-       HostConfig      dockercontainer.HostConfig
-       token           string
-       ContainerID     string
-       ExitCode        *int
-       NewLogWriter    NewLogWriter
-       loggingDone     chan bool
-       CrunchLog       *ThrottledLogger
-       Stdout          io.WriteCloser
-       Stderr          io.WriteCloser
-       logUUID         string
-       logMtx          sync.Mutex
-       LogCollection   arvados.CollectionFileSystem
-       LogsPDH         *string
-       RunArvMount     RunArvMount
-       MkTempDir       MkTempDir
-       ArvMount        *exec.Cmd
-       ArvMountPoint   string
-       HostOutputDir   string
-       Binds           []string
-       Volumes         map[string]struct{}
-       OutputPDH       *string
-       SigChan         chan os.Signal
-       ArvMountExit    chan error
-       SecretMounts    map[string]arvados.Mount
-       MkArvClient     func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
-       finalState      string
-       parentTemp      string
+       Container     arvados.Container
+       token         string
+       ExitCode      *int
+       NewLogWriter  NewLogWriter
+       CrunchLog     *ThrottledLogger
+       Stdout        io.WriteCloser
+       Stderr        io.WriteCloser
+       logUUID       string
+       logMtx        sync.Mutex
+       LogCollection arvados.CollectionFileSystem
+       LogsPDH       *string
+       RunArvMount   RunArvMount
+       MkTempDir     MkTempDir
+       ArvMount      *exec.Cmd
+       ArvMountPoint string
+       HostOutputDir string
+       Volumes       map[string]struct{}
+       OutputPDH     *string
+       SigChan       chan os.Signal
+       ArvMountExit  chan error
+       SecretMounts  map[string]arvados.Mount
+       MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
+       finalState    string
+       parentTemp    string
 
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
@@ -171,19 +147,20 @@ type ContainerRunner struct {
 
        cStateLock sync.Mutex
        cCancelled bool // StopContainer() invoked
-       cRemoved   bool // docker confirmed the container no longer exists
 
-       enableNetwork string // one of "default" or "always"
-       networkMode   string // passed through to HostConfig.NetworkMode
-       arvMountLog   *ThrottledLogger
+       enableMemoryLimit bool
+       enableNetwork     string // one of "default" or "always"
+       networkMode       string // "none", "host", or "" -- passed through to executor
+       arvMountLog       *ThrottledLogger
 
        containerWatchdogInterval time.Duration
 
        gateway Gateway
 }
 
-// setupSignals sets up signal handling to gracefully terminate the underlying
-// Docker container and update state when receiving a TERM, INT or QUIT signal.
+// setupSignals sets up signal handling to gracefully terminate the
+// underlying container and update state when receiving a TERM, INT or
+// QUIT signal.
 func (runner *ContainerRunner) setupSignals() {
        runner.SigChan = make(chan os.Signal, 1)
        signal.Notify(runner.SigChan, syscall.SIGTERM)
@@ -197,24 +174,18 @@ func (runner *ContainerRunner) setupSignals() {
        }(runner.SigChan)
 }
 
-// stop the underlying Docker container.
+// stop the underlying container.
 func (runner *ContainerRunner) stop(sig os.Signal) {
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
        if sig != nil {
                runner.CrunchLog.Printf("caught signal: %v", sig)
        }
-       if runner.ContainerID == "" {
-               return
-       }
        runner.cCancelled = true
-       runner.CrunchLog.Printf("removing container")
-       err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+       runner.CrunchLog.Printf("stopping container")
+       err := runner.executor.Stop()
        if err != nil {
-               runner.CrunchLog.Printf("error removing container: %s", err)
-       }
-       if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) {
-               runner.cRemoved = true
+               runner.CrunchLog.Printf("error stopping container: %s", err)
        }
 }
 
@@ -262,57 +233,44 @@ func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
-func (runner *ContainerRunner) LoadImage() (err error) {
-
+func (runner *ContainerRunner) LoadImage() (string, error) {
        runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
 
-       var collection arvados.Collection
-       err = runner.ContainerArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
+       d, err := os.Open(runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage)
+       if err != nil {
+               return "", err
+       }
+       defer d.Close()
+       allfiles, err := d.Readdirnames(-1)
        if err != nil {
-               return fmt.Errorf("While getting container image collection: %v", err)
+               return "", err
        }
-       manifest := manifest.Manifest{Text: collection.ManifestText}
-       var img, imageID string
-       for ms := range manifest.StreamIter() {
-               img = ms.FileStreamSegments[0].Name
-               if !strings.HasSuffix(img, ".tar") {
-                       return fmt.Errorf("First file in the container image collection does not end in .tar")
+       var tarfiles []string
+       for _, fnm := range allfiles {
+               if strings.HasSuffix(fnm, ".tar") {
+                       tarfiles = append(tarfiles, fnm)
                }
-               imageID = img[:len(img)-4]
        }
+       if len(tarfiles) == 0 {
+               return "", fmt.Errorf("image collection does not include a .tar image file")
+       }
+       if len(tarfiles) > 1 {
+               return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles)
+       }
+       imageID := tarfiles[0][:len(tarfiles[0])-4]
+       imageFile := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + tarfiles[0]
+       runner.CrunchLog.Printf("Using Docker image id %q", imageID)
 
-       runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
-
-       _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
-       if err != nil {
+       if !runner.executor.ImageLoaded(imageID) {
                runner.CrunchLog.Print("Loading Docker image from keep")
-
-               var readCloser io.ReadCloser
-               readCloser, err = runner.ContainerKeepClient.ManifestFileReader(manifest, img)
+               err = runner.executor.LoadImage(imageFile)
                if err != nil {
-                       return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
+                       return "", err
                }
-
-               response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
-               if err != nil {
-                       return fmt.Errorf("While loading container image into Docker: %v", err)
-               }
-
-               defer response.Body.Close()
-               rbody, err := ioutil.ReadAll(response.Body)
-               if err != nil {
-                       return fmt.Errorf("Reading response to image load: %v", err)
-               }
-               runner.CrunchLog.Printf("Docker response: %s", rbody)
        } else {
                runner.CrunchLog.Print("Docker image is available")
        }
-
-       runner.ContainerConfig.Image = imageID
-
-       runner.ContainerKeepClient.ClearBlockCache()
-
-       return nil
+       return imageID, nil
 }
 
 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
@@ -334,7 +292,7 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
        }
        runner.arvMountLog = NewThrottledLogger(w)
        c.Stdout = runner.arvMountLog
-       c.Stderr = runner.arvMountLog
+       c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr)
 
        runner.CrunchLog.Printf("Running %v", c.Args)
 
@@ -418,16 +376,18 @@ func copyfile(src string, dst string) (err error) {
        return nil
 }
 
-func (runner *ContainerRunner) SetupMounts() (err error) {
-       err = runner.SetupArvMountPoint("keep")
+func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
+       bindmounts := map[string]bindmount{}
+       err := runner.SetupArvMountPoint("keep")
        if err != nil {
-               return fmt.Errorf("While creating keep mount temp dir: %v", err)
+               return nil, fmt.Errorf("While creating keep mount temp dir: %v", err)
        }
 
        token, err := runner.ContainerToken()
        if err != nil {
-               return fmt.Errorf("could not get container token: %s", err)
+               return nil, fmt.Errorf("could not get container token: %s", err)
        }
+       runner.CrunchLog.Printf("container token %q", token)
 
        pdhOnly := true
        tmpcount := 0
@@ -442,8 +402,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        }
 
        collectionPaths := []string{}
-       runner.Binds = nil
-       runner.Volumes = make(map[string]struct{})
        needCertMount := true
        type copyFile struct {
                src  string
@@ -457,11 +415,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        }
        for bind := range runner.SecretMounts {
                if _, ok := runner.Container.Mounts[bind]; ok {
-                       return fmt.Errorf("secret mount %q conflicts with regular mount", bind)
+                       return nil, fmt.Errorf("secret mount %q conflicts with regular mount", bind)
                }
                if runner.SecretMounts[bind].Kind != "json" &&
                        runner.SecretMounts[bind].Kind != "text" {
-                       return fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted",
+                       return nil, fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted",
                                bind, runner.SecretMounts[bind].Kind)
                }
                binds = append(binds, bind)
@@ -476,7 +434,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                if bind == "stdout" || bind == "stderr" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
-                               return fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind)
+                               return nil, fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind)
                        }
 
                        // Does path start with OutputPath?
@@ -485,14 +443,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                prefix += "/"
                        }
                        if !strings.HasPrefix(mnt.Path, prefix) {
-                               return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+                               return nil, fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
                        }
                }
 
                if bind == "stdin" {
                        // Is it a "collection" mount kind?
                        if mnt.Kind != "collection" && mnt.Kind != "json" {
-                               return fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind)
+                               return nil, fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind)
                        }
                }
 
@@ -502,7 +460,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
                if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
                        if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" {
-                               return fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
+                               return nil, fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
                        }
                }
 
@@ -510,17 +468,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                case mnt.Kind == "collection" && bind != "stdin":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
-                               return fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
+                               return nil, fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
                        }
                        if mnt.UUID != "" {
                                if mnt.Writable {
-                                       return fmt.Errorf("writing to existing collections currently not permitted")
+                                       return nil, fmt.Errorf("writing to existing collections currently not permitted")
                                }
                                pdhOnly = false
                                src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
                        } else if mnt.PortableDataHash != "" {
                                if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
-                                       return fmt.Errorf("can never write to a collection specified by portable data hash")
+                                       return nil, fmt.Errorf("can never write to a collection specified by portable data hash")
                                }
                                idx := strings.Index(mnt.PortableDataHash, "/")
                                if idx > 0 {
@@ -546,14 +504,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if mnt.Writable {
                                if bind == runner.Container.OutputPath {
                                        runner.HostOutputDir = src
-                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
+                                       bindmounts[bind] = bindmount{HostPath: src}
                                } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
                                        copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
                                } else {
-                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
+                                       bindmounts[bind] = bindmount{HostPath: src}
                                }
                        } else {
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
+                               bindmounts[bind] = bindmount{HostPath: src, ReadOnly: true}
                        }
                        collectionPaths = append(collectionPaths, src)
 
@@ -561,17 +519,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        var tmpdir string
                        tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
                        if err != nil {
-                               return fmt.Errorf("while creating mount temp dir: %v", err)
+                               return nil, fmt.Errorf("while creating mount temp dir: %v", err)
                        }
                        st, staterr := os.Stat(tmpdir)
                        if staterr != nil {
-                               return fmt.Errorf("while Stat on temp dir: %v", staterr)
+                               return nil, fmt.Errorf("while Stat on temp dir: %v", staterr)
                        }
                        err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
                        if staterr != nil {
-                               return fmt.Errorf("while Chmod temp dir: %v", err)
+                               return nil, fmt.Errorf("while Chmod temp dir: %v", err)
                        }
-                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
+                       bindmounts[bind] = bindmount{HostPath: tmpdir}
                        if bind == runner.Container.OutputPath {
                                runner.HostOutputDir = tmpdir
                        }
@@ -581,53 +539,53 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if mnt.Kind == "json" {
                                filedata, err = json.Marshal(mnt.Content)
                                if err != nil {
-                                       return fmt.Errorf("encoding json data: %v", err)
+                                       return nil, fmt.Errorf("encoding json data: %v", err)
                                }
                        } else {
                                text, ok := mnt.Content.(string)
                                if !ok {
-                                       return fmt.Errorf("content for mount %q must be a string", bind)
+                                       return nil, fmt.Errorf("content for mount %q must be a string", bind)
                                }
                                filedata = []byte(text)
                        }
 
                        tmpdir, err := runner.MkTempDir(runner.parentTemp, mnt.Kind)
                        if err != nil {
-                               return fmt.Errorf("creating temp dir: %v", err)
+                               return nil, fmt.Errorf("creating temp dir: %v", err)
                        }
                        tmpfn := filepath.Join(tmpdir, "mountdata."+mnt.Kind)
                        err = ioutil.WriteFile(tmpfn, filedata, 0444)
                        if err != nil {
-                               return fmt.Errorf("writing temp file: %v", err)
+                               return nil, fmt.Errorf("writing temp file: %v", err)
                        }
                        if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
                                copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
                        } else {
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
+                               bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
                        }
 
                case mnt.Kind == "git_tree":
                        tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
                        if err != nil {
-                               return fmt.Errorf("creating temp dir: %v", err)
+                               return nil, fmt.Errorf("creating temp dir: %v", err)
                        }
                        err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
                        if err != nil {
-                               return err
+                               return nil, err
                        }
-                       runner.Binds = append(runner.Binds, tmpdir+":"+bind+":ro")
+                       bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true}
                }
        }
 
        if runner.HostOutputDir == "" {
-               return fmt.Errorf("output path does not correspond to a writable mount point")
+               return nil, fmt.Errorf("output path does not correspond to a writable mount point")
        }
 
        if needCertMount && runner.Container.RuntimeConstraints.API {
                for _, certfile := range arvadosclient.CertFiles {
                        _, err := os.Stat(certfile)
                        if err == nil {
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile))
+                               bindmounts["/etc/arvados/ca-certificates.crt"] = bindmount{HostPath: certfile, ReadOnly: true}
                                break
                        }
                }
@@ -642,20 +600,20 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
        if err != nil {
-               return fmt.Errorf("while trying to start arv-mount: %v", err)
+               return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
        }
 
        for _, p := range collectionPaths {
                _, err = os.Stat(p)
                if err != nil {
-                       return fmt.Errorf("while checking that input files exist: %v", err)
+                       return nil, fmt.Errorf("while checking that input files exist: %v", err)
                }
        }
 
        for _, cp := range copyFiles {
                st, err := os.Stat(cp.src)
                if err != nil {
-                       return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+                       return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
                }
                if st.IsDir() {
                        err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
@@ -686,59 +644,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                }
                if err != nil {
-                       return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+                       return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
                }
        }
 
-       return nil
-}
-
-func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
-       // Handle docker log protocol
-       // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
-       defer close(runner.loggingDone)
-
-       header := make([]byte, 8)
-       var err error
-       for err == nil {
-               _, err = io.ReadAtLeast(containerReader, header, 8)
-               if err != nil {
-                       if err == io.EOF {
-                               err = nil
-                       }
-                       break
-               }
-               readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
-               if header[0] == 1 {
-                       // stdout
-                       _, err = io.CopyN(runner.Stdout, containerReader, readsize)
-               } else {
-                       // stderr
-                       _, err = io.CopyN(runner.Stderr, containerReader, readsize)
-               }
-       }
-
-       if err != nil {
-               runner.CrunchLog.Printf("error reading docker logs: %v", err)
-       }
-
-       err = runner.Stdout.Close()
-       if err != nil {
-               runner.CrunchLog.Printf("error closing stdout logs: %v", err)
-       }
-
-       err = runner.Stderr.Close()
-       if err != nil {
-               runner.CrunchLog.Printf("error closing stderr logs: %v", err)
-       }
-
-       if runner.statReporter != nil {
-               runner.statReporter.Stop()
-               err = runner.statLogger.Close()
-               if err != nil {
-                       runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
-               }
-       }
+       return bindmounts, nil
 }
 
 func (runner *ContainerRunner) stopHoststat() error {
@@ -775,7 +685,7 @@ func (runner *ContainerRunner) startCrunchstat() error {
        }
        runner.statLogger = NewThrottledLogger(w)
        runner.statReporter = &crunchstat.Reporter{
-               CID:          runner.ContainerID,
+               CID:          runner.executor.CgroupID(),
                Logger:       log.New(runner.statLogger, "", 0),
                CgroupParent: runner.expectCgroupParent,
                CgroupRoot:   runner.cgroupRoot,
@@ -938,102 +848,6 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
        return true, nil
 }
 
-// AttachStreams connects the docker container stdin, stdout and stderr logs
-// to the Arvados logger which logs to Keep and the API server logs table.
-func (runner *ContainerRunner) AttachStreams() (err error) {
-
-       runner.CrunchLog.Print("Attaching container streams")
-
-       // If stdin mount is provided, attach it to the docker container
-       var stdinRdr arvados.File
-       var stdinJSON []byte
-       if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
-               if stdinMnt.Kind == "collection" {
-                       var stdinColl arvados.Collection
-                       collID := stdinMnt.UUID
-                       if collID == "" {
-                               collID = stdinMnt.PortableDataHash
-                       }
-                       err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl)
-                       if err != nil {
-                               return fmt.Errorf("While getting stdin collection: %v", err)
-                       }
-
-                       stdinRdr, err = runner.ContainerKeepClient.ManifestFileReader(
-                               manifest.Manifest{Text: stdinColl.ManifestText},
-                               stdinMnt.Path)
-                       if os.IsNotExist(err) {
-                               return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
-                       } else if err != nil {
-                               return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
-                       }
-               } else if stdinMnt.Kind == "json" {
-                       stdinJSON, err = json.Marshal(stdinMnt.Content)
-                       if err != nil {
-                               return fmt.Errorf("While encoding stdin json data: %v", err)
-                       }
-               }
-       }
-
-       stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
-       response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
-       if err != nil {
-               return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
-       }
-
-       runner.loggingDone = make(chan bool)
-
-       if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
-               stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
-               if err != nil {
-                       return err
-               }
-               runner.Stdout = stdoutFile
-       } else if w, err := runner.NewLogWriter("stdout"); err != nil {
-               return err
-       } else {
-               runner.Stdout = NewThrottledLogger(w)
-       }
-
-       if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
-               stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
-               if err != nil {
-                       return err
-               }
-               runner.Stderr = stderrFile
-       } else if w, err := runner.NewLogWriter("stderr"); err != nil {
-               return err
-       } else {
-               runner.Stderr = NewThrottledLogger(w)
-       }
-
-       if stdinRdr != nil {
-               go func() {
-                       _, err := io.Copy(response.Conn, stdinRdr)
-                       if err != nil {
-                               runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err)
-                               runner.stop(nil)
-                       }
-                       stdinRdr.Close()
-                       response.CloseWrite()
-               }()
-       } else if len(stdinJSON) != 0 {
-               go func() {
-                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON))
-                       if err != nil {
-                               runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
-                               runner.stop(nil)
-                       }
-                       response.CloseWrite()
-               }()
-       }
-
-       go runner.ProcessDockerAttach(response.Reader)
-
-       return nil
-}
-
 func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
        stdoutPath := mntPath[len(runner.Container.OutputPath):]
        index := strings.LastIndex(stdoutPath, "/")
@@ -1060,86 +874,110 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
 }
 
 // CreateContainer creates the docker container.
-func (runner *ContainerRunner) CreateContainer() error {
-       runner.CrunchLog.Print("Creating Docker container")
-
-       runner.ContainerConfig.Cmd = runner.Container.Command
-       if runner.Container.Cwd != "." {
-               runner.ContainerConfig.WorkingDir = runner.Container.Cwd
+func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
+       var stdin io.ReadCloser
+       if mnt, ok := runner.Container.Mounts["stdin"]; ok {
+               switch mnt.Kind {
+               case "collection":
+                       var collID string
+                       if mnt.UUID != "" {
+                               collID = mnt.UUID
+                       } else {
+                               collID = mnt.PortableDataHash
+                       }
+                       path := runner.ArvMountPoint + "/by_id/" + collID + "/" + mnt.Path
+                       f, err := os.Open(path)
+                       if err != nil {
+                               return err
+                       }
+                       stdin = f
+               case "json":
+                       j, err := json.Marshal(mnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("error encoding stdin json data: %v", err)
+                       }
+                       stdin = ioutil.NopCloser(bytes.NewReader(j))
+               default:
+                       return fmt.Errorf("stdin mount has unsupported kind %q", mnt.Kind)
+               }
        }
 
-       for k, v := range runner.Container.Environment {
-               runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
+       var stdout, stderr io.WriteCloser
+       if mnt, ok := runner.Container.Mounts["stdout"]; ok {
+               f, err := runner.getStdoutFile(mnt.Path)
+               if err != nil {
+                       return err
+               }
+               stdout = f
+       } else if w, err := runner.NewLogWriter("stdout"); err != nil {
+               return err
+       } else {
+               stdout = NewThrottledLogger(w)
        }
 
-       runner.ContainerConfig.Volumes = runner.Volumes
-
-       maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
-       minDockerRAM := int64(16)
-       if maxRAM < minDockerRAM*1024*1024 {
-               // Docker daemon won't let you set a limit less than ~10 MiB
-               maxRAM = minDockerRAM * 1024 * 1024
-       }
-       runner.HostConfig = dockercontainer.HostConfig{
-               Binds: runner.Binds,
-               LogConfig: dockercontainer.LogConfig{
-                       Type: "none",
-               },
-               Resources: dockercontainer.Resources{
-                       CgroupParent: runner.setCgroupParent,
-                       NanoCPUs:     int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000,
-                       Memory:       maxRAM, // RAM
-                       MemorySwap:   maxRAM, // RAM+swap
-                       KernelMemory: maxRAM, // kernel portion
-               },
+       if mnt, ok := runner.Container.Mounts["stderr"]; ok {
+               f, err := runner.getStdoutFile(mnt.Path)
+               if err != nil {
+                       return err
+               }
+               stderr = f
+       } else if w, err := runner.NewLogWriter("stderr"); err != nil {
+               return err
+       } else {
+               stderr = NewThrottledLogger(w)
        }
 
+       env := runner.Container.Environment
+       enableNetwork := runner.enableNetwork == "always"
        if runner.Container.RuntimeConstraints.API {
+               enableNetwork = true
                tok, err := runner.ContainerToken()
                if err != nil {
                        return err
                }
-               runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
-                       "ARVADOS_API_TOKEN="+tok,
-                       "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
-                       "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
-               )
-               runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
-       } else {
-               if runner.enableNetwork == "always" {
-                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
-               } else {
-                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
-               }
-       }
-
-       _, stdinUsed := runner.Container.Mounts["stdin"]
-       runner.ContainerConfig.OpenStdin = stdinUsed
-       runner.ContainerConfig.StdinOnce = stdinUsed
-       runner.ContainerConfig.AttachStdin = stdinUsed
-       runner.ContainerConfig.AttachStdout = true
-       runner.ContainerConfig.AttachStderr = true
-
-       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
-       if err != nil {
-               return fmt.Errorf("While creating container: %v", err)
-       }
-
-       runner.ContainerID = createdBody.ID
-
-       return runner.AttachStreams()
+               env = map[string]string{}
+               for k, v := range runner.Container.Environment {
+                       env[k] = v
+               }
+               env["ARVADOS_API_TOKEN"] = tok
+               env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST")
+               env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE")
+       }
+       workdir := runner.Container.Cwd
+       if workdir == "." {
+               // both "" and "." mean default
+               workdir = ""
+       }
+       ram := runner.Container.RuntimeConstraints.RAM
+       if !runner.enableMemoryLimit {
+               ram = 0
+       }
+       return runner.executor.Create(containerSpec{
+               Image:         imageID,
+               VCPUs:         runner.Container.RuntimeConstraints.VCPUs,
+               RAM:           ram,
+               WorkingDir:    workdir,
+               Env:           env,
+               BindMounts:    bindmounts,
+               Command:       runner.Container.Command,
+               EnableNetwork: enableNetwork,
+               NetworkMode:   runner.networkMode,
+               CgroupParent:  runner.setCgroupParent,
+               Stdin:         stdin,
+               Stdout:        stdout,
+               Stderr:        stderr,
+       })
 }
 
 // StartContainer starts the docker container created by CreateContainer.
 func (runner *ContainerRunner) StartContainer() error {
-       runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
+       runner.CrunchLog.Printf("Starting container")
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
        if runner.cCancelled {
                return ErrCancelled
        }
-       err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerStartOptions{})
+       err := runner.executor.Start()
        if err != nil {
                var advice string
                if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
@@ -1153,71 +991,39 @@ func (runner *ContainerRunner) StartContainer() error {
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
 func (runner *ContainerRunner) WaitFinish() error {
-       var runTimeExceeded <-chan time.Time
        runner.CrunchLog.Print("Waiting for container to finish")
-
-       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
-       arvMountExit := runner.ArvMountExit
-       if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
-               runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
+       var timeout <-chan time.Time
+       if s := runner.Container.SchedulingParameters.MaxRunTime; s > 0 {
+               timeout = time.After(time.Duration(s) * time.Second)
        }
-
-       containerGone := make(chan struct{})
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
        go func() {
-               defer close(containerGone)
-               if runner.containerWatchdogInterval < 1 {
-                       runner.containerWatchdogInterval = time.Minute
-               }
-               for range time.NewTicker(runner.containerWatchdogInterval).C {
-                       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
-                       ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
-                       cancel()
-                       runner.cStateLock.Lock()
-                       done := runner.cRemoved || runner.ExitCode != nil
-                       runner.cStateLock.Unlock()
-                       if done {
-                               return
-                       } else if err != nil {
-                               runner.CrunchLog.Printf("Error inspecting container: %s", err)
-                               runner.checkBrokenNode(err)
-                               return
-                       } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
-                               runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State)
-                               return
-                       }
-               }
-       }()
-
-       for {
                select {
-               case waitBody := <-waitOk:
-                       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
-                       code := int(waitBody.StatusCode)
-                       runner.ExitCode = &code
-
-                       // wait for stdout/stderr to complete
-                       <-runner.loggingDone
-                       return nil
-
-               case err := <-waitErr:
-                       return fmt.Errorf("container wait: %v", err)
-
-               case <-arvMountExit:
-                       runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
-                       runner.stop(nil)
-                       // arvMountExit will always be ready now that
-                       // it's closed, but that doesn't interest us.
-                       arvMountExit = nil
-
-               case <-runTimeExceeded:
+               case <-timeout:
                        runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.")
                        runner.stop(nil)
-                       runTimeExceeded = nil
+               case <-runner.ArvMountExit:
+                       runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
+                       runner.stop(nil)
+               case <-ctx.Done():
+               }
+       }()
+       exitcode, err := runner.executor.Wait(ctx)
+       if err != nil {
+               runner.checkBrokenNode(err)
+               return err
+       }
+       runner.ExitCode = &exitcode
 
-               case <-containerGone:
-                       return errors.New("docker client never returned status")
+       if runner.statReporter != nil {
+               runner.statReporter.Stop()
+               err = runner.statLogger.Close()
+               if err != nil {
+                       runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
+       return nil
 }
 
 func (runner *ContainerRunner) updateLogs() {
@@ -1270,7 +1076,7 @@ func (runner *ContainerRunner) updateLogs() {
 
 // CaptureOutput saves data from the container's output directory if
 // needed, and updates the container output accordingly.
-func (runner *ContainerRunner) CaptureOutput() error {
+func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error {
        if runner.Container.RuntimeConstraints.API {
                // Output may have been set directly by the container, so
                // refresh the container record to check.
@@ -1292,7 +1098,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
                keepClient:    runner.ContainerKeepClient,
                hostOutputDir: runner.HostOutputDir,
                ctrOutputDir:  runner.Container.OutputPath,
-               binds:         runner.Binds,
+               bindmounts:    bindmounts,
                mounts:        runner.Container.Mounts,
                secretMounts:  runner.SecretMounts,
                logger:        runner.CrunchLog,
@@ -1568,6 +1374,7 @@ func (runner *ContainerRunner) Run() (err error) {
                return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State)
        }
 
+       var bindmounts map[string]bindmount
        defer func() {
                // checkErr prints e (unless it's nil) and sets err to
                // e (unless err is already non-nil). Thus, if err
@@ -1602,7 +1409,9 @@ func (runner *ContainerRunner) Run() (err error) {
                        // capture partial output and write logs
                }
 
-               checkErr("CaptureOutput", runner.CaptureOutput())
+               if bindmounts != nil {
+                       checkErr("CaptureOutput", runner.CaptureOutput(bindmounts))
+               }
                checkErr("stopHoststat", runner.stopHoststat())
                checkErr("CommitLogs", runner.CommitLogs())
                checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
@@ -1614,8 +1423,16 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
+       // set up FUSE mount and binds
+       bindmounts, err = runner.SetupMounts()
+       if err != nil {
+               runner.finalState = "Cancelled"
+               err = fmt.Errorf("While setting up mounts: %v", err)
+               return
+       }
+
        // check for and/or load image
-       err = runner.LoadImage()
+       imageID, err := runner.LoadImage()
        if err != nil {
                if !runner.checkBrokenNode(err) {
                        // Failed to load image but not due to a "broken node"
@@ -1626,15 +1443,7 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       // set up FUSE mount and binds
-       err = runner.SetupMounts()
-       if err != nil {
-               runner.finalState = "Cancelled"
-               err = fmt.Errorf("While setting up mounts: %v", err)
-               return
-       }
-
-       err = runner.CreateContainer()
+       err = runner.CreateContainer(imageID, bindmounts)
        if err != nil {
                return
        }
@@ -1727,14 +1536,12 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
 func NewContainerRunner(dispatcherClient *arvados.Client,
        dispatcherArvClient IArvadosClient,
        dispatcherKeepClient IKeepClient,
-       docker ThinDockerClient,
        containerUUID string) (*ContainerRunner, error) {
 
        cr := &ContainerRunner{
                dispatcherClient:     dispatcherClient,
                DispatcherArvClient:  dispatcherArvClient,
                DispatcherKeepClient: dispatcherKeepClient,
-               Docker:               docker,
        }
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
@@ -1784,15 +1591,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
-       enableNetwork := flags.String("container-enable-networking", "default",
-               `Specify if networking should be enabled for container.  One of 'default', 'always':
-       default: only enable networking if container requests it.
-       always:  containers always have networking enabled
-       `)
-       networkMode := flags.String("container-network-mode", "default",
-               `Set networking mode for container.  Corresponds to Docker network mode (--net).
-       `)
+       enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
+       enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
+       networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
        memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
+       runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
 
        ignoreDetachFlag := false
@@ -1825,18 +1628,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                }
        }
 
-       containerID := flags.Arg(0)
+       containerUUID := flags.Arg(0)
 
        switch {
        case *detach && !ignoreDetachFlag:
-               return Detach(containerID, prog, args, os.Stdout, os.Stderr)
+               return Detach(containerUUID, prog, args, os.Stdout, os.Stderr)
        case *kill >= 0:
-               return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr)
+               return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
        case *list:
                return ListProcesses(os.Stdout, os.Stderr)
        }
 
-       if containerID == "" {
+       if containerUUID == "" {
                log.Printf("usage: %s [options] UUID", prog)
                return 1
        }
@@ -1850,45 +1653,62 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               log.Printf("%s: %v", containerID, err)
+               log.Printf("%s: %v", containerUUID, err)
                return 1
        }
        api.Retries = 8
 
        kc, kcerr := keepclient.MakeKeepClient(api)
        if kcerr != nil {
-               log.Printf("%s: %v", containerID, kcerr)
+               log.Printf("%s: %v", containerUUID, kcerr)
                return 1
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
-       // API version 1.21 corresponds to Docker 1.9, which is currently the
-       // minimum version we want to support.
-       docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-
-       cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
+       cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID)
        if err != nil {
                log.Print(err)
                return 1
        }
-       if dockererr != nil {
-               cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
-               cr.checkBrokenNode(dockererr)
+
+       switch *runtimeEngine {
+       case "docker":
+               cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
+       case "singularity":
+               cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf)
+       default:
+               cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine)
                cr.CrunchLog.Close()
                return 1
        }
-
-       cr.gateway = Gateway{
-               Address:            os.Getenv("GatewayAddress"),
-               AuthSecret:         os.Getenv("GatewayAuthSecret"),
-               ContainerUUID:      containerID,
-               DockerContainerID:  &cr.ContainerID,
-               Log:                cr.CrunchLog,
-               ContainerIPAddress: dockerContainerIPAddress(&cr.ContainerID),
+       if err != nil {
+               cr.CrunchLog.Printf("%s: %v", containerUUID, err)
+               cr.checkBrokenNode(err)
+               cr.CrunchLog.Close()
+               return 1
        }
+       defer cr.executor.Close()
+
+       gwAuthSecret := os.Getenv("GatewayAuthSecret")
        os.Unsetenv("GatewayAuthSecret")
-       if cr.gateway.Address != "" {
+       if gwAuthSecret == "" {
+               // not safe to run a gateway service without an auth
+               // secret
+               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
+       } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
+               // dispatcher did not tell us which external IP
+               // address to advertise --> no gateway service
+               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
+       } else if de, ok := cr.executor.(*dockerExecutor); ok {
+               cr.gateway = Gateway{
+                       Address:            gwListen,
+                       AuthSecret:         gwAuthSecret,
+                       ContainerUUID:      containerUUID,
+                       DockerContainerID:  &de.containerID,
+                       Log:                cr.CrunchLog,
+                       ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+               }
                err = cr.gateway.Start()
                if err != nil {
                        log.Printf("error starting gateway server: %s", err)
@@ -1896,9 +1716,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                }
        }
 
-       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".")
+       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerUUID+".")
        if tmperr != nil {
-               log.Printf("%s: %v", containerID, tmperr)
+               log.Printf("%s: %v", containerUUID, tmperr)
                return 1
        }
 
@@ -1906,6 +1726,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent
+       cr.enableMemoryLimit = *enableMemoryLimit
        cr.enableNetwork = *enableNetwork
        cr.networkMode = *networkMode
        if *cgroupParentSubsystem != "" {
@@ -1932,7 +1753,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
 
        if runerr != nil {
-               log.Printf("%s: %v", containerID, runerr)
+               log.Printf("%s: %v", containerUUID, runerr)
                return 1
        }
        return 0
index dbdaa6293d28c964efc237c9e1b98e44b5ef921c..5f7e71d95793e304c49dbd92ba5ac5fb9534ad93 100644 (file)
@@ -5,7 +5,6 @@
 package crunchrun
 
 import (
-       "bufio"
        "bytes"
        "crypto/md5"
        "encoding/json"
@@ -13,11 +12,10 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "net"
        "os"
        "os/exec"
+       "regexp"
        "runtime/pprof"
-       "sort"
        "strings"
        "sync"
        "syscall"
@@ -30,9 +28,6 @@ import (
        "git.arvados.org/arvados.git/sdk/go/manifest"
        "golang.org/x/net/context"
 
-       dockertypes "github.com/docker/docker/api/types"
-       dockercontainer "github.com/docker/docker/api/types/container"
-       dockernetwork "github.com/docker/docker/api/types/network"
        . "gopkg.in/check.v1"
 )
 
@@ -41,18 +36,43 @@ func TestCrunchExec(t *testing.T) {
        TestingT(t)
 }
 
-// Gocheck boilerplate
 var _ = Suite(&TestSuite{})
 
 type TestSuite struct {
-       client *arvados.Client
-       docker *TestDockerClient
-       runner *ContainerRunner
+       client    *arvados.Client
+       api       *ArvTestClient
+       runner    *ContainerRunner
+       executor  *stubExecutor
+       keepmount string
 }
 
 func (s *TestSuite) SetUpTest(c *C) {
+       *brokenNodeHook = ""
        s.client = arvados.NewClientFromEnv()
-       s.docker = NewTestDockerClient()
+       s.executor = &stubExecutor{}
+       var err error
+       s.api = &ArvTestClient{}
+       s.runner, err = NewContainerRunner(s.client, s.api, &KeepTestClient{}, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
+       s.runner.executor = s.executor
+       s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
+               return s.api, &KeepTestClient{}, s.client, nil
+       }
+       s.runner.RunArvMount = func(cmd []string, tok string) (*exec.Cmd, error) {
+               s.runner.ArvMountPoint = s.keepmount
+               return nil, nil
+       }
+       s.keepmount = c.MkDir()
+       err = os.Mkdir(s.keepmount+"/by_id", 0755)
+       c.Assert(err, IsNil)
+       err = os.Mkdir(s.keepmount+"/by_id/"+arvadostest.DockerImage112PDH, 0755)
+       c.Assert(err, IsNil)
+       err = ioutil.WriteFile(s.keepmount+"/by_id/"+arvadostest.DockerImage112PDH+"/"+arvadostest.DockerImage112Filename, []byte("#notarealtarball"), 0644)
+       err = os.Mkdir(s.keepmount+"/by_id/"+fakeInputCollectionPDH, 0755)
+       c.Assert(err, IsNil)
+       err = ioutil.WriteFile(s.keepmount+"/by_id/"+fakeInputCollectionPDH+"/input.json", []byte(`{"input":true}`), 0644)
+       c.Assert(err, IsNil)
+       s.runner.ArvMountPoint = s.keepmount
 }
 
 type ArvTestClient struct {
@@ -72,6 +92,38 @@ type KeepTestClient struct {
        Content []byte
 }
 
+type stubExecutor struct {
+       imageLoaded bool
+       loaded      string
+       loadErr     error
+       exitCode    int
+       createErr   error
+       created     containerSpec
+       startErr    error
+       waitSleep   time.Duration
+       waitErr     error
+       stopErr     error
+       stopped     bool
+       closed      bool
+       runFunc     func()
+       exit        chan int
+}
+
+func (e *stubExecutor) ImageLoaded(imageID string) bool { return e.imageLoaded }
+func (e *stubExecutor) LoadImage(filename string) error { e.loaded = filename; return e.loadErr }
+func (e *stubExecutor) Create(spec containerSpec) error { e.created = spec; return e.createErr }
+func (e *stubExecutor) Start() error                    { e.exit = make(chan int, 1); go e.runFunc(); return e.startErr }
+func (e *stubExecutor) CgroupID() string                { return "cgroupid" }
+func (e *stubExecutor) Stop() error                     { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr }
+func (e *stubExecutor) Close()                          { e.closed = true }
+func (e *stubExecutor) Wait(context.Context) (int, error) {
+       defer e.created.Stdout.Close()
+       defer e.created.Stderr.Close()
+       return <-e.exit, e.waitErr
+}
+
+const fakeInputCollectionPDH = "ffffffffaaaaaaaa88888888eeeeeeee+1234"
+
 var hwManifest = ". 82ab40c24fc8df01798e57ba66795bb1+841216+Aa124ac75e5168396c73c0a18eda641a4f41791c0@569fa8c3 0:841216:9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7.tar\n"
 var hwPDH = "a45557269dcb65a6b78f9ac061c0850b+120"
 var hwImageID = "9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7"
@@ -92,129 +144,6 @@ var denormalizedWithSubdirsPDH = "b0def87f80dd594d4675809e83bd4f15+367"
 var fakeAuthUUID = "zzzzz-gj3su-55pqoyepgi2glem"
 var fakeAuthToken = "a3ltuwzqcu2u4sc0q7yhpc2w7s00fdcqecg5d6e0u3pfohmbjt"
 
-type TestDockerClient struct {
-       imageLoaded string
-       logReader   io.ReadCloser
-       logWriter   io.WriteCloser
-       fn          func(t *TestDockerClient)
-       exitCode    int
-       stop        chan bool
-       cwd         string
-       env         []string
-       api         *ArvTestClient
-       realTemp    string
-       calledWait  bool
-       ctrExited   bool
-}
-
-func NewTestDockerClient() *TestDockerClient {
-       t := &TestDockerClient{}
-       t.logReader, t.logWriter = io.Pipe()
-       t.stop = make(chan bool, 1)
-       t.cwd = "/"
-       return t
-}
-
-type MockConn struct {
-       net.Conn
-}
-
-func (m *MockConn) Write(b []byte) (int, error) {
-       return len(b), nil
-}
-
-func NewMockConn() *MockConn {
-       c := &MockConn{}
-       return c
-}
-
-func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
-       return dockertypes.HijackedResponse{Conn: NewMockConn(), Reader: bufio.NewReader(t.logReader)}, nil
-}
-
-func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
-       if config.WorkingDir != "" {
-               t.cwd = config.WorkingDir
-       }
-       t.env = config.Env
-       return dockercontainer.ContainerCreateCreatedBody{ID: "abcde"}, nil
-}
-
-func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
-       if t.exitCode == 3 {
-               return errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`)
-       }
-       if t.exitCode == 4 {
-               return errors.New(`panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`)
-       }
-       if t.exitCode == 5 {
-               return errors.New(`Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`)
-       }
-       if t.exitCode == 6 {
-               return errors.New(`Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`)
-       }
-
-       if container == "abcde" {
-               // t.fn gets executed in ContainerWait
-               return nil
-       }
-       return errors.New("Invalid container id")
-}
-
-func (t *TestDockerClient) ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error {
-       t.stop <- true
-       return nil
-}
-
-func (t *TestDockerClient) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
-       t.calledWait = true
-       body := make(chan dockercontainer.ContainerWaitOKBody, 1)
-       err := make(chan error)
-       go func() {
-               t.fn(t)
-               body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.exitCode)}
-       }()
-       return body, err
-}
-
-func (t *TestDockerClient) ContainerInspect(ctx context.Context, id string) (c dockertypes.ContainerJSON, err error) {
-       c.ContainerJSONBase = &dockertypes.ContainerJSONBase{}
-       c.ID = "abcde"
-       if t.ctrExited {
-               c.State = &dockertypes.ContainerState{Status: "exited", Dead: true}
-       } else {
-               c.State = &dockertypes.ContainerState{Status: "running", Pid: 1234, Running: true}
-       }
-       return
-}
-
-func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
-       if t.exitCode == 2 {
-               return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
-       }
-
-       if t.imageLoaded == image {
-               return dockertypes.ImageInspect{}, nil, nil
-       }
-       return dockertypes.ImageInspect{}, nil, errors.New("")
-}
-
-func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
-       if t.exitCode == 2 {
-               return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
-       }
-       _, err := io.Copy(ioutil.Discard, input)
-       if err != nil {
-               return dockertypes.ImageLoadResponse{}, err
-       }
-       t.imageLoaded = hwImageID
-       return dockertypes.ImageLoadResponse{Body: ioutil.NopCloser(input)}, nil
-}
-
-func (*TestDockerClient) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
-       return nil, nil
-}
-
 func (client *ArvTestClient) Create(resourceType string,
        parameters arvadosclient.Dict,
        output interface{}) error {
@@ -287,7 +216,7 @@ func (client *ArvTestClient) CallRaw(method, resourceType, uuid, action string,
        } else {
                j = []byte(`{
                        "command": ["sleep", "1"],
-                       "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+                       "container_image": "` + arvadostest.DockerImage112PDH + `",
                        "cwd": ".",
                        "environment": {},
                        "mounts": {"/tmp": {"kind": "tmp"}, "/json": {"kind": "json", "content": {"number": 123456789123456789}}},
@@ -438,49 +367,45 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
 }
 
 func (s *TestSuite) TestLoadImage(c *C) {
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{},
-               &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       c.Assert(err, IsNil)
-
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr.ContainerArvClient = &ArvTestClient{}
-       cr.ContainerKeepClient = kc
-
-       _, err = cr.Docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{})
-       c.Check(err, IsNil)
-
-       _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageID)
-       c.Check(err, NotNil)
-
-       cr.Container.ContainerImage = hwPDH
-
-       // (1) Test loading image from keep
-       c.Check(kc.Called, Equals, false)
-       c.Check(cr.ContainerConfig.Image, Equals, "")
-
-       err = cr.LoadImage()
-
-       c.Check(err, IsNil)
-       defer func() {
-               cr.Docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{})
-       }()
+       s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
+       s.runner.Container.Mounts = map[string]arvados.Mount{
+               "/out": {Kind: "tmp", Writable: true},
+       }
+       s.runner.Container.OutputPath = "/out"
 
-       c.Check(kc.Called, Equals, true)
-       c.Check(cr.ContainerConfig.Image, Equals, hwImageID)
+       _, err := s.runner.SetupMounts()
+       c.Assert(err, IsNil)
 
-       _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageID)
+       imageID, err := s.runner.LoadImage()
        c.Check(err, IsNil)
-
-       // (2) Test using image that's already loaded
-       kc.Called = false
-       cr.ContainerConfig.Image = ""
-
-       err = cr.LoadImage()
+       c.Check(s.executor.loaded, Matches, ".*"+regexp.QuoteMeta(arvadostest.DockerImage112Filename))
+       c.Check(imageID, Equals, strings.TrimSuffix(arvadostest.DockerImage112Filename, ".tar"))
+
+       s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
+       s.executor.imageLoaded = false
+       s.executor.loaded = ""
+       s.executor.loadErr = errors.New("bork")
+       imageID, err = s.runner.LoadImage()
+       c.Check(err, ErrorMatches, ".*bork")
+       c.Check(s.executor.loaded, Matches, ".*"+regexp.QuoteMeta(arvadostest.DockerImage112Filename))
+
+       s.runner.Container.ContainerImage = fakeInputCollectionPDH
+       s.executor.imageLoaded = false
+       s.executor.loaded = ""
+       s.executor.loadErr = nil
+       imageID, err = s.runner.LoadImage()
+       c.Check(err, ErrorMatches, "image collection does not include a \\.tar image file")
+       c.Check(s.executor.loaded, Equals, "")
+
+       // if executor reports image is already loaded, LoadImage should not be called
+       s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
+       s.executor.imageLoaded = true
+       s.executor.loaded = ""
+       s.executor.loadErr = nil
+       imageID, err = s.runner.LoadImage()
        c.Check(err, IsNil)
-       c.Check(kc.Called, Equals, false)
-       c.Check(cr.ContainerConfig.Image, Equals, hwImageID)
-
+       c.Check(s.executor.loaded, Equals, "")
+       c.Check(imageID, Equals, strings.TrimSuffix(arvadostest.DockerImage112Filename, ".tar"))
 }
 
 type ArvErrorTestClient struct{}
@@ -555,65 +480,6 @@ func (KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename
        return ErrorReader{}, nil
 }
 
-func (s *TestSuite) TestLoadImageArvError(c *C) {
-       // (1) Arvados error
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr, err := NewContainerRunner(s.client, &ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       c.Assert(err, IsNil)
-
-       cr.ContainerArvClient = &ArvErrorTestClient{}
-       cr.ContainerKeepClient = &KeepTestClient{}
-
-       cr.Container.ContainerImage = hwPDH
-
-       err = cr.LoadImage()
-       c.Check(err.Error(), Equals, "While getting container image collection: ArvError")
-}
-
-func (s *TestSuite) TestLoadImageKeepError(c *C) {
-       // (2) Keep error
-       kc := &KeepErrorTestClient{}
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       c.Assert(err, IsNil)
-
-       cr.ContainerArvClient = &ArvTestClient{}
-       cr.ContainerKeepClient = &KeepErrorTestClient{}
-
-       cr.Container.ContainerImage = hwPDH
-
-       err = cr.LoadImage()
-       c.Assert(err, NotNil)
-       c.Check(err.Error(), Equals, "While creating ManifestFileReader for container image: KeepError")
-}
-
-func (s *TestSuite) TestLoadImageCollectionError(c *C) {
-       // (3) Collection doesn't contain image
-       kc := &KeepReadErrorTestClient{}
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       c.Assert(err, IsNil)
-       cr.Container.ContainerImage = otherPDH
-
-       cr.ContainerArvClient = &ArvTestClient{}
-       cr.ContainerKeepClient = &KeepReadErrorTestClient{}
-
-       err = cr.LoadImage()
-       c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar")
-}
-
-func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
-       // (4) Collection doesn't contain image
-       kc := &KeepReadErrorTestClient{}
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       c.Assert(err, IsNil)
-       cr.Container.ContainerImage = hwPDH
-       cr.ContainerArvClient = &ArvTestClient{}
-       cr.ContainerKeepClient = &KeepReadErrorTestClient{}
-
-       err = cr.LoadImage()
-       c.Check(err, NotNil)
-}
-
 type ClosableBuffer struct {
        bytes.Buffer
 }
@@ -647,35 +513,31 @@ func dockerLog(fd byte, msg string) []byte {
 }
 
 func (s *TestSuite) TestRunContainer(c *C) {
-       s.docker.fn = func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, "Hello world\n"))
-               t.logWriter.Close()
+       s.executor.runFunc = func() {
+               fmt.Fprintf(s.executor.created.Stdout, "Hello world\n")
+               s.executor.created.Stdout.Close()
+               s.executor.created.Stderr.Close()
+               s.executor.exit <- 0
        }
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       c.Assert(err, IsNil)
-
-       cr.ContainerArvClient = &ArvTestClient{}
-       cr.ContainerKeepClient = &KeepTestClient{}
 
        var logs TestLogs
-       cr.NewLogWriter = logs.NewTestLoggingWriter
-       cr.Container.ContainerImage = hwPDH
-       cr.Container.Command = []string{"./hw"}
-       err = cr.LoadImage()
-       c.Check(err, IsNil)
+       s.runner.NewLogWriter = logs.NewTestLoggingWriter
+       s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
+       s.runner.Container.Command = []string{"./hw"}
 
-       err = cr.CreateContainer()
-       c.Check(err, IsNil)
+       imageID, err := s.runner.LoadImage()
+       c.Assert(err, IsNil)
 
-       err = cr.StartContainer()
-       c.Check(err, IsNil)
+       err = s.runner.CreateContainer(imageID, nil)
+       c.Assert(err, IsNil)
 
-       err = cr.WaitFinish()
-       c.Check(err, IsNil)
+       err = s.runner.StartContainer()
+       c.Assert(err, IsNil)
+
+       err = s.runner.WaitFinish()
+       c.Assert(err, IsNil)
 
-       c.Check(strings.HasSuffix(logs.Stdout.String(), "Hello world\n"), Equals, true)
+       c.Check(logs.Stdout.String(), Matches, ".*Hello world\n")
        c.Check(logs.Stderr.String(), Equals, "")
 }
 
@@ -683,7 +545,7 @@ func (s *TestSuite) TestCommitLogs(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
 
@@ -705,7 +567,7 @@ func (s *TestSuite) TestUpdateContainerRunning(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
 
        err = cr.UpdateContainerRunning()
@@ -718,7 +580,7 @@ func (s *TestSuite) TestUpdateContainerComplete(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
 
        cr.LogsPDH = new(string)
@@ -740,7 +602,7 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        cr.cCancelled = true
        cr.finalState = "Cancelled"
@@ -755,10 +617,10 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
 
 // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
 // dress rehearsal of the Run() function, starting from a JSON container record.
-func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
-       rec := arvados.Container{}
-       err := json.Unmarshal([]byte(record), &rec)
-       c.Check(err, IsNil)
+func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func()) (*ArvTestClient, *ContainerRunner, string) {
+       err := json.Unmarshal([]byte(record), &s.api.Container)
+       c.Assert(err, IsNil)
+       initialState := s.api.Container.State
 
        var sm struct {
                SecretMounts map[string]arvados.Mount `json:"secret_mounts"`
@@ -766,33 +628,22 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
        err = json.Unmarshal([]byte(record), &sm)
        c.Check(err, IsNil)
        secretMounts, err := json.Marshal(sm)
-       c.Logf("%s %q", sm, secretMounts)
-       c.Check(err, IsNil)
-
-       s.docker.exitCode = exitCode
-       s.docker.fn = fn
-       s.docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{})
-
-       api = &ArvTestClient{Container: rec}
-       s.docker.api = api
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
-       s.runner = cr
-       cr.statInterval = 100 * time.Millisecond
-       cr.containerWatchdogInterval = time.Second
-       am := &ArvMountCmdLine{}
-       cr.RunArvMount = am.ArvMountTest
+       c.Logf("SecretMounts decoded %v json %q", sm, secretMounts)
 
-       realTemp, err = ioutil.TempDir("", "crunchrun_test1-")
-       c.Assert(err, IsNil)
-       defer os.RemoveAll(realTemp)
+       s.executor.runFunc = func() {
+               fn()
+               s.executor.exit <- exitCode
+       }
 
-       s.docker.realTemp = realTemp
+       s.runner.statInterval = 100 * time.Millisecond
+       s.runner.containerWatchdogInterval = time.Second
+       am := &ArvMountCmdLine{}
+       s.runner.RunArvMount = am.ArvMountTest
 
+       realTemp := c.MkDir()
        tempcount := 0
-       cr.MkTempDir = func(_ string, prefix string) (string, error) {
+       s.runner.MkTempDir = func(_, prefix string) (string, error) {
                tempcount++
                d := fmt.Sprintf("%s/%s%d", realTemp, prefix, tempcount)
                err := os.Mkdir(d, os.ModePerm)
@@ -802,73 +653,81 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
                }
                return d, err
        }
-       cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
+       s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
                return &ArvTestClient{secretMounts: secretMounts}, &KeepTestClient{}, nil, nil
        }
 
        if extraMounts != nil && len(extraMounts) > 0 {
-               err := cr.SetupArvMountPoint("keep")
+               err := s.runner.SetupArvMountPoint("keep")
                c.Check(err, IsNil)
 
                for _, m := range extraMounts {
-                       os.MkdirAll(cr.ArvMountPoint+"/by_id/"+m, os.ModePerm)
+                       os.MkdirAll(s.runner.ArvMountPoint+"/by_id/"+m, os.ModePerm)
                }
        }
 
-       err = cr.Run()
-       if api.CalledWith("container.state", "Complete") != nil {
+       err = s.runner.Run()
+       if s.api.CalledWith("container.state", "Complete") != nil {
                c.Check(err, IsNil)
        }
-       if exitCode != 2 {
-               c.Check(api.WasSetRunning, Equals, true)
+       if s.executor.loadErr == nil && s.executor.createErr == nil && initialState != "Running" {
+               c.Check(s.api.WasSetRunning, Equals, true)
                var lastupdate arvadosclient.Dict
-               for _, content := range api.Content {
+               for _, content := range s.api.Content {
                        if content["container"] != nil {
                                lastupdate = content["container"].(arvadosclient.Dict)
                        }
                }
                if lastupdate["log"] == nil {
-                       c.Errorf("no container update with non-nil log -- updates were: %v", api.Content)
+                       c.Errorf("no container update with non-nil log -- updates were: %v", s.api.Content)
                }
        }
 
        if err != nil {
-               for k, v := range api.Logs {
+               for k, v := range s.api.Logs {
                        c.Log(k)
                        c.Log(v.String())
                }
        }
 
-       return
+       return s.api, s.runner, realTemp
 }
 
 func (s *TestSuite) TestFullRunHello(c *C) {
-       api, _, _ := s.fullRunHelper(c, `{
+       s.runner.enableMemoryLimit = true
+       s.runner.networkMode = "default"
+       s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": ".",
-    "environment": {},
+    "environment": {"foo":"bar","baz":"waz"},
     "mounts": {"/tmp": {"kind": "tmp"} },
     "output_path": "/tmp",
     "priority": 1,
-    "runtime_constraints": {},
+    "runtime_constraints": {"vcpus":1,"ram":1000000},
     "state": "Locked"
-}`, nil, 0, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, "hello world\n"))
-               t.logWriter.Close()
+}`, nil, 0, func() {
+               c.Check(s.executor.created.Command, DeepEquals, []string{"echo", "hello world"})
+               c.Check(s.executor.created.Image, Equals, "sha256:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678")
+               c.Check(s.executor.created.Env, DeepEquals, map[string]string{"foo": "bar", "baz": "waz"})
+               c.Check(s.executor.created.VCPUs, Equals, 1)
+               c.Check(s.executor.created.RAM, Equals, int64(1000000))
+               c.Check(s.executor.created.NetworkMode, Equals, "default")
+               c.Check(s.executor.created.EnableNetwork, Equals, false)
+               fmt.Fprintln(s.executor.created.Stdout, "hello world")
        })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello world\n")
 
 }
 
 func (s *TestSuite) TestRunAlreadyRunning(c *C) {
        var ran bool
-       api, _, _ := s.fullRunHelper(c, `{
+       s.fullRunHelper(c, `{
     "command": ["sleep", "3"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": ".",
     "environment": {},
     "mounts": {"/tmp": {"kind": "tmp"} },
@@ -877,19 +736,18 @@ func (s *TestSuite) TestRunAlreadyRunning(c *C) {
     "runtime_constraints": {},
     "scheduling_parameters":{"max_run_time": 1},
     "state": "Running"
-}`, nil, 2, func(t *TestDockerClient) {
+}`, nil, 2, func() {
                ran = true
        })
-
-       c.Check(api.CalledWith("container.state", "Cancelled"), IsNil)
-       c.Check(api.CalledWith("container.state", "Complete"), IsNil)
+       c.Check(s.api.CalledWith("container.state", "Cancelled"), IsNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), IsNil)
        c.Check(ran, Equals, false)
 }
 
 func (s *TestSuite) TestRunTimeExceeded(c *C) {
-       api, _, _ := s.fullRunHelper(c, `{
+       s.fullRunHelper(c, `{
     "command": ["sleep", "3"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": ".",
     "environment": {},
     "mounts": {"/tmp": {"kind": "tmp"} },
@@ -898,38 +756,35 @@ func (s *TestSuite) TestRunTimeExceeded(c *C) {
     "runtime_constraints": {},
     "scheduling_parameters":{"max_run_time": 1},
     "state": "Locked"
-}`, nil, 0, func(t *TestDockerClient) {
+}`, nil, 0, func() {
                time.Sleep(3 * time.Second)
-               t.logWriter.Close()
        })
 
-       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
+       c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
+       c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
 }
 
 func (s *TestSuite) TestContainerWaitFails(c *C) {
-       api, _, _ := s.fullRunHelper(c, `{
+       s.fullRunHelper(c, `{
     "command": ["sleep", "3"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": ".",
     "mounts": {"/tmp": {"kind": "tmp"} },
     "output_path": "/tmp",
     "priority": 1,
     "state": "Locked"
-}`, nil, 0, func(t *TestDockerClient) {
-               t.ctrExited = true
-               time.Sleep(10 * time.Second)
-               t.logWriter.Close()
+}`, nil, 0, func() {
+               s.executor.waitErr = errors.New("Container is not running")
        })
 
-       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*")
+       c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
+       c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*")
 }
 
 func (s *TestSuite) TestCrunchstat(c *C) {
-       api, _, _ := s.fullRunHelper(c, `{
+       s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "`+arvadostest.DockerImage112PDH+`",
                "cwd": ".",
                "environment": {},
                "mounts": {"/tmp": {"kind": "tmp"} },
@@ -937,33 +792,32 @@ func (s *TestSuite) TestCrunchstat(c *C) {
                "priority": 1,
                "runtime_constraints": {},
                "state": "Locked"
-       }`, nil, 0, func(t *TestDockerClient) {
+       }`, nil, 0, func() {
                time.Sleep(time.Second)
-               t.logWriter.Close()
        })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
 
        // We didn't actually start a container, so crunchstat didn't
        // find accounting files and therefore didn't log any stats.
        // It should have logged a "can't find accounting files"
        // message after one poll interval, though, so we can confirm
        // it's alive:
-       c.Assert(api.Logs["crunchstat"], NotNil)
-       c.Check(api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files have not appeared after 100ms.*`)
+       c.Assert(s.api.Logs["crunchstat"], NotNil)
+       c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files have not appeared after 100ms.*`)
 
        // The "files never appeared" log assures us that we called
        // (*crunchstat.Reporter)Stop(), and that we set it up with
        // the correct container ID "abcde":
-       c.Check(api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files never appeared for abcde\n`)
+       c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files never appeared for cgroupid\n`)
 }
 
 func (s *TestSuite) TestNodeInfoLog(c *C) {
        os.Setenv("SLURMD_NODENAME", "compute2")
-       api, _, _ := s.fullRunHelper(c, `{
+       s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "`+arvadostest.DockerImage112PDH+`",
                "cwd": ".",
                "environment": {},
                "mounts": {"/tmp": {"kind": "tmp"} },
@@ -972,22 +826,21 @@ func (s *TestSuite) TestNodeInfoLog(c *C) {
                "runtime_constraints": {},
                "state": "Locked"
        }`, nil, 0,
-               func(t *TestDockerClient) {
+               func() {
                        time.Sleep(time.Second)
-                       t.logWriter.Close()
                })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
 
-       c.Assert(api.Logs["node"], NotNil)
-       json := api.Logs["node"].String()
+       c.Assert(s.api.Logs["node"], NotNil)
+       json := s.api.Logs["node"].String()
        c.Check(json, Matches, `(?ms).*"uuid": *"zzzzz-7ekkf-2z3mc76g2q73aio".*`)
        c.Check(json, Matches, `(?ms).*"total_cpu_cores": *16.*`)
        c.Check(json, Not(Matches), `(?ms).*"info":.*`)
 
-       c.Assert(api.Logs["node-info"], NotNil)
-       json = api.Logs["node-info"].String()
+       c.Assert(s.api.Logs["node-info"], NotNil)
+       json = s.api.Logs["node-info"].String()
        c.Check(json, Matches, `(?ms).*Host Information.*`)
        c.Check(json, Matches, `(?ms).*CPU Information.*`)
        c.Check(json, Matches, `(?ms).*Memory Information.*`)
@@ -996,9 +849,9 @@ func (s *TestSuite) TestNodeInfoLog(c *C) {
 }
 
 func (s *TestSuite) TestContainerRecordLog(c *C) {
-       api, _, _ := s.fullRunHelper(c, `{
+       s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "`+arvadostest.DockerImage112PDH+`",
                "cwd": ".",
                "environment": {},
                "mounts": {"/tmp": {"kind": "tmp"} },
@@ -1007,22 +860,21 @@ func (s *TestSuite) TestContainerRecordLog(c *C) {
                "runtime_constraints": {},
                "state": "Locked"
        }`, nil, 0,
-               func(t *TestDockerClient) {
+               func() {
                        time.Sleep(time.Second)
-                       t.logWriter.Close()
                })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
 
-       c.Assert(api.Logs["container"], NotNil)
-       c.Check(api.Logs["container"].String(), Matches, `(?ms).*container_image.*`)
+       c.Assert(s.api.Logs["container"], NotNil)
+       c.Check(s.api.Logs["container"].String(), Matches, `(?ms).*container_image.*`)
 }
 
 func (s *TestSuite) TestFullRunStderr(c *C) {
-       api, _, _ := s.fullRunHelper(c, `{
+       s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": ".",
     "environment": {},
     "mounts": {"/tmp": {"kind": "tmp"} },
@@ -1030,25 +882,24 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 1, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, "hello\n"))
-               t.logWriter.Write(dockerLog(2, "world\n"))
-               t.logWriter.Close()
+}`, nil, 1, func() {
+               fmt.Fprintln(s.executor.created.Stdout, "hello")
+               fmt.Fprintln(s.executor.created.Stderr, "world")
        })
 
-       final := api.CalledWith("container.state", "Complete")
+       final := s.api.CalledWith("container.state", "Complete")
        c.Assert(final, NotNil)
        c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
        c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
 
-       c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true)
-       c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
+       c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello\n")
+       c.Check(s.api.Logs["stderr"].String(), Matches, ".*world\n")
 }
 
 func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
-       api, _, _ := s.fullRunHelper(c, `{
+       s.fullRunHelper(c, `{
     "command": ["pwd"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": ".",
     "environment": {},
     "mounts": {"/tmp": {"kind": "tmp"} },
@@ -1056,21 +907,20 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 0, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
-               t.logWriter.Close()
+}`, nil, 0, func() {
+               fmt.Fprintf(s.executor.created.Stdout, "workdir=%q", s.executor.created.WorkingDir)
        })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Log(api.Logs["stdout"])
-       c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+       c.Log(s.api.Logs["stdout"])
+       c.Check(s.api.Logs["stdout"].String(), Matches, `.*workdir=""\n`)
 }
 
 func (s *TestSuite) TestFullRunSetCwd(c *C) {
-       api, _, _ := s.fullRunHelper(c, `{
+       s.fullRunHelper(c, `{
     "command": ["pwd"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": "/bin",
     "environment": {},
     "mounts": {"/tmp": {"kind": "tmp"} },
@@ -1078,41 +928,37 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 0, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
-               t.logWriter.Close()
+}`, nil, 0, func() {
+               fmt.Fprintln(s.executor.created.Stdout, s.executor.created.WorkingDir)
        })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n")
 }
 
 func (s *TestSuite) TestStopOnSignal(c *C) {
-       s.testStopContainer(c, func(cr *ContainerRunner) {
-               go func() {
-                       for !s.docker.calledWait {
-                               time.Sleep(time.Millisecond)
-                       }
-                       cr.SigChan <- syscall.SIGINT
-               }()
-       })
+       s.executor.runFunc = func() {
+               s.executor.created.Stdout.Write([]byte("foo\n"))
+               s.runner.SigChan <- syscall.SIGINT
+       }
+       s.testStopContainer(c)
 }
 
 func (s *TestSuite) TestStopOnArvMountDeath(c *C) {
-       s.testStopContainer(c, func(cr *ContainerRunner) {
-               cr.ArvMountExit = make(chan error)
-               go func() {
-                       cr.ArvMountExit <- exec.Command("true").Run()
-                       close(cr.ArvMountExit)
-               }()
-       })
+       s.executor.runFunc = func() {
+               s.executor.created.Stdout.Write([]byte("foo\n"))
+               s.runner.ArvMountExit <- nil
+               close(s.runner.ArvMountExit)
+       }
+       s.runner.ArvMountExit = make(chan error)
+       s.testStopContainer(c)
 }
 
-func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
+func (s *TestSuite) testStopContainer(c *C) {
        record := `{
     "command": ["/bin/sh", "-c", "echo foo && sleep 30 && echo bar"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "` + arvadostest.DockerImage112PDH + `",
     "cwd": ".",
     "environment": {},
     "mounts": {"/tmp": {"kind": "tmp"} },
@@ -1122,31 +968,17 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
     "state": "Locked"
 }`
 
-       rec := arvados.Container{}
-       err := json.Unmarshal([]byte(record), &rec)
-       c.Check(err, IsNil)
-
-       s.docker.fn = func(t *TestDockerClient) {
-               <-t.stop
-               t.logWriter.Write(dockerLog(1, "foo\n"))
-               t.logWriter.Close()
-       }
-       s.docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{})
-
-       api := &ArvTestClient{Container: rec}
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       err := json.Unmarshal([]byte(record), &s.api.Container)
        c.Assert(err, IsNil)
-       cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
-       cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
+
+       s.runner.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
+       s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
                return &ArvTestClient{}, &KeepTestClient{}, nil, nil
        }
-       setup(cr)
 
        done := make(chan error)
        go func() {
-               done <- cr.Run()
+               done <- s.runner.Run()
        }()
        select {
        case <-time.After(20 * time.Second):
@@ -1155,20 +987,20 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
        case err = <-done:
                c.Check(err, IsNil)
        }
-       for k, v := range api.Logs {
+       for k, v := range s.api.Logs {
                c.Log(k)
-               c.Log(v.String())
+               c.Log(v.String(), "\n")
        }
 
-       c.Check(api.CalledWith("container.log", nil), NotNil)
-       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
-       c.Check(api.Logs["stdout"].String(), Matches, "(?ms).*foo\n$")
+       c.Check(s.api.CalledWith("container.log", nil), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
+       c.Check(s.api.Logs["stdout"].String(), Matches, "(?ms).*foo\n$")
 }
 
 func (s *TestSuite) TestFullRunSetEnv(c *C) {
-       api, _, _ := s.fullRunHelper(c, `{
+       s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $FROBIZ"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": "/bin",
     "environment": {"FROBIZ": "bilbo"},
     "mounts": {"/tmp": {"kind": "tmp"} },
@@ -1176,14 +1008,13 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 0, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
-               t.logWriter.Close()
+}`, nil, 0, func() {
+               fmt.Fprintf(s.executor.created.Stdout, "%v", s.executor.created.Env)
        })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(s.api.Logs["stdout"].String(), Matches, `.*map\[FROBIZ:bilbo\]\n`)
 }
 
 type ArvMountCmdLine struct {
@@ -1206,27 +1037,17 @@ func stubCert(temp string) string {
 }
 
 func (s *TestSuite) TestSetupMounts(c *C) {
-       api := &ArvTestClient{}
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       c.Assert(err, IsNil)
+       cr := s.runner
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
        cr.ContainerArvClient = &ArvTestClient{}
        cr.ContainerKeepClient = &KeepTestClient{}
 
-       realTemp, err := ioutil.TempDir("", "crunchrun_test1-")
-       c.Assert(err, IsNil)
-       certTemp, err := ioutil.TempDir("", "crunchrun_test2-")
-       c.Assert(err, IsNil)
+       realTemp := c.MkDir()
+       certTemp := c.MkDir()
        stubCertPath := stubCert(certTemp)
-
        cr.parentTemp = realTemp
 
-       defer os.RemoveAll(realTemp)
-       defer os.RemoveAll(certTemp)
-
        i := 0
        cr.MkTempDir = func(_ string, prefix string) (string, error) {
                i++
@@ -1255,12 +1076,12 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
                cr.Container.OutputPath = "/tmp"
                cr.statInterval = 5 * time.Second
-               err := cr.SetupMounts()
+               bindmounts, err := cr.SetupMounts()
                c.Check(err, IsNil)
                c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
                        "--read-write", "--crunchstat-interval=5",
                        "--mount-by-pdh", "by_id", realTemp + "/keep1"})
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp"})
+               c.Check(bindmounts, DeepEquals, map[string]bindmount{"/tmp": {realTemp + "/tmp2", false}})
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
@@ -1274,12 +1095,12 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
                cr.Container.OutputPath = "/out"
 
-               err := cr.SetupMounts()
+               bindmounts, err := cr.SetupMounts()
                c.Check(err, IsNil)
                c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
                        "--read-write", "--crunchstat-interval=5",
                        "--mount-by-pdh", "by_id", realTemp + "/keep1"})
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/out", realTemp + "/tmp3:/tmp"})
+               c.Check(bindmounts, DeepEquals, map[string]bindmount{"/out": {realTemp + "/tmp2", false}, "/tmp": {realTemp + "/tmp3", false}})
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
@@ -1293,12 +1114,12 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                cr.Container.OutputPath = "/tmp"
                cr.Container.RuntimeConstraints.API = true
 
-               err := cr.SetupMounts()
+               bindmounts, err := cr.SetupMounts()
                c.Check(err, IsNil)
                c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
                        "--read-write", "--crunchstat-interval=5",
                        "--mount-by-pdh", "by_id", realTemp + "/keep1"})
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"})
+               c.Check(bindmounts, DeepEquals, map[string]bindmount{"/tmp": {realTemp + "/tmp2", false}, "/etc/arvados/ca-certificates.crt": {stubCertPath, true}})
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
@@ -1316,12 +1137,12 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
                os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
 
-               err := cr.SetupMounts()
+               bindmounts, err := cr.SetupMounts()
                c.Check(err, IsNil)
                c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
                        "--read-write", "--crunchstat-interval=5",
                        "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/tmp0:/keeptmp"})
+               c.Check(bindmounts, DeepEquals, map[string]bindmount{"/keeptmp": {realTemp + "/keep1/tmp0", false}})
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
@@ -1339,14 +1160,15 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
                os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
 
-               err := cr.SetupMounts()
+               bindmounts, err := cr.SetupMounts()
                c.Check(err, IsNil)
                c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
                        "--read-write", "--crunchstat-interval=5",
                        "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
-               sort.StringSlice(cr.Binds).Sort()
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
-                       realTemp + "/keep1/tmp0:/keepout"})
+               c.Check(bindmounts, DeepEquals, map[string]bindmount{
+                       "/keepinp": {realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", true},
+                       "/keepout": {realTemp + "/keep1/tmp0", false},
+               })
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
@@ -1365,14 +1187,15 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
                os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
 
-               err := cr.SetupMounts()
+               bindmounts, err := cr.SetupMounts()
                c.Check(err, IsNil)
                c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
                        "--read-write", "--crunchstat-interval=5",
                        "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
-               sort.StringSlice(cr.Binds).Sort()
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
-                       realTemp + "/keep1/tmp0:/keepout"})
+               c.Check(bindmounts, DeepEquals, map[string]bindmount{
+                       "/keepinp": {realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", true},
+                       "/keepout": {realTemp + "/keep1/tmp0", false},
+               })
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
@@ -1391,10 +1214,11 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                cr.Container.Mounts = map[string]arvados.Mount{
                        "/mnt/test.json": {Kind: "json", Content: test.in},
                }
-               err := cr.SetupMounts()
+               bindmounts, err := cr.SetupMounts()
                c.Check(err, IsNil)
-               sort.StringSlice(cr.Binds).Sort()
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/json2/mountdata.json:/mnt/test.json:ro"})
+               c.Check(bindmounts, DeepEquals, map[string]bindmount{
+                       "/mnt/test.json": {realTemp + "/json2/mountdata.json", true},
+               })
                content, err := ioutil.ReadFile(realTemp + "/json2/mountdata.json")
                c.Check(err, IsNil)
                c.Check(content, DeepEquals, []byte(test.out))
@@ -1416,13 +1240,14 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                cr.Container.Mounts = map[string]arvados.Mount{
                        "/mnt/test.txt": {Kind: "text", Content: test.in},
                }
-               err := cr.SetupMounts()
+               bindmounts, err := cr.SetupMounts()
                if test.out == "error" {
                        c.Check(err.Error(), Equals, "content for mount \"/mnt/test.txt\" must be a string")
                } else {
                        c.Check(err, IsNil)
-                       sort.StringSlice(cr.Binds).Sort()
-                       c.Check(cr.Binds, DeepEquals, []string{realTemp + "/text2/mountdata.text:/mnt/test.txt:ro"})
+                       c.Check(bindmounts, DeepEquals, map[string]bindmount{
+                               "/mnt/test.txt": {realTemp + "/text2/mountdata.text", true},
+                       })
                        content, err := ioutil.ReadFile(realTemp + "/text2/mountdata.text")
                        c.Check(err, IsNil)
                        c.Check(content, DeepEquals, []byte(test.out))
@@ -1445,12 +1270,15 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
                os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
 
-               err := cr.SetupMounts()
+               bindmounts, err := cr.SetupMounts()
                c.Check(err, IsNil)
                c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
                        "--read-write", "--crunchstat-interval=5",
                        "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"})
+               c.Check(bindmounts, DeepEquals, map[string]bindmount{
+                       "/tmp":     {realTemp + "/tmp2", false},
+                       "/tmp/foo": {realTemp + "/keep1/tmp0", true},
+               })
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
@@ -1480,7 +1308,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                rf.Write([]byte("bar"))
                rf.Close()
 
-               err := cr.SetupMounts()
+               _, err := cr.SetupMounts()
                c.Check(err, IsNil)
                _, err = os.Stat(cr.HostOutputDir + "/foo")
                c.Check(err, IsNil)
@@ -1502,7 +1330,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                }
                cr.Container.OutputPath = "/tmp"
 
-               err := cr.SetupMounts()
+               _, err := cr.SetupMounts()
                c.Check(err, NotNil)
                c.Check(err, ErrorMatches, `only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path.*`)
                os.RemoveAll(cr.ArvMountPoint)
@@ -1519,7 +1347,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                        "stdin": {Kind: "tmp"},
                }
 
-               err := cr.SetupMounts()
+               _, err := cr.SetupMounts()
                c.Check(err, NotNil)
                c.Check(err, ErrorMatches, `unsupported mount kind 'tmp' for stdin.*`)
                os.RemoveAll(cr.ArvMountPoint)
@@ -1550,34 +1378,24 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                }
                cr.Container.OutputPath = "/tmp"
 
-               err := cr.SetupMounts()
+               bindmounts, err := cr.SetupMounts()
                c.Check(err, IsNil)
 
-               // dirMap[mountpoint] == tmpdir
-               dirMap := make(map[string]string)
-               for _, bind := range cr.Binds {
-                       tokens := strings.Split(bind, ":")
-                       dirMap[tokens[1]] = tokens[0]
-
-                       if cr.Container.Mounts[tokens[1]].Writable {
-                               c.Check(len(tokens), Equals, 2)
-                       } else {
-                               c.Check(len(tokens), Equals, 3)
-                               c.Check(tokens[2], Equals, "ro")
-                       }
+               for path, mount := range bindmounts {
+                       c.Check(mount.ReadOnly, Equals, !cr.Container.Mounts[path].Writable, Commentf("%s %#v", path, mount))
                }
 
-               data, err := ioutil.ReadFile(dirMap["/tip"] + "/dir1/dir2/file with mode 0644")
+               data, err := ioutil.ReadFile(bindmounts["/tip"].HostPath + "/dir1/dir2/file with mode 0644")
                c.Check(err, IsNil)
                c.Check(string(data), Equals, "\000\001\002\003")
-               _, err = ioutil.ReadFile(dirMap["/tip"] + "/file only on testbranch")
+               _, err = ioutil.ReadFile(bindmounts["/tip"].HostPath + "/file only on testbranch")
                c.Check(err, FitsTypeOf, &os.PathError{})
                c.Check(os.IsNotExist(err), Equals, true)
 
-               data, err = ioutil.ReadFile(dirMap["/non-tip"] + "/dir1/dir2/file with mode 0644")
+               data, err = ioutil.ReadFile(bindmounts["/non-tip"].HostPath + "/dir1/dir2/file with mode 0644")
                c.Check(err, IsNil)
                c.Check(string(data), Equals, "\000\001\002\003")
-               data, err = ioutil.ReadFile(dirMap["/non-tip"] + "/file only on testbranch")
+               data, err = ioutil.ReadFile(bindmounts["/non-tip"].HostPath + "/file only on testbranch")
                c.Check(err, IsNil)
                c.Check(string(data), Equals, "testfile\n")
 
@@ -1589,7 +1407,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 func (s *TestSuite) TestStdout(c *C) {
        helperRecord := `{
                "command": ["/bin/sh", "-c", "echo $FROBIZ"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "` + arvadostest.DockerImage112PDH + `",
                "cwd": "/bin",
                "environment": {"FROBIZ": "bilbo"},
                "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"} },
@@ -1599,38 +1417,25 @@ func (s *TestSuite) TestStdout(c *C) {
                "state": "Locked"
        }`
 
-       api, cr, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
-               t.logWriter.Close()
+       s.fullRunHelper(c, helperRecord, nil, 0, func() {
+               fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
        })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
 }
 
 // Used by the TestStdoutWithWrongPath*()
-func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
-       rec := arvados.Container{}
-       err = json.Unmarshal([]byte(record), &rec)
-       c.Check(err, IsNil)
-
-       s.docker.fn = fn
-       s.docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{})
-
-       api = &ArvTestClient{Container: rec}
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func()) (*ArvTestClient, *ContainerRunner, error) {
+       err := json.Unmarshal([]byte(record), &s.api.Container)
        c.Assert(err, IsNil)
-       am := &ArvMountCmdLine{}
-       cr.RunArvMount = am.ArvMountTest
-       cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
-               return &ArvTestClient{}, &KeepTestClient{}, nil, nil
+       s.executor.runFunc = fn
+       s.runner.RunArvMount = (&ArvMountCmdLine{}).ArvMountTest
+       s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
+               return s.api, &KeepTestClient{}, nil, nil
        }
-
-       err = cr.Run()
-       return
+       return s.api, s.runner, s.runner.Run()
 }
 
 func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
@@ -1638,10 +1443,8 @@ func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} },
     "output_path": "/tmp",
     "state": "Locked"
-}`, func(t *TestDockerClient) {})
-
-       c.Check(err, NotNil)
-       c.Check(strings.Contains(err.Error(), "Stdout path does not start with OutputPath"), Equals, true)
+}`, func() {})
+       c.Check(err, ErrorMatches, ".*Stdout path does not start with OutputPath.*")
 }
 
 func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
@@ -1649,10 +1452,8 @@ func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} },
     "output_path": "/tmp",
     "state": "Locked"
-}`, func(t *TestDockerClient) {})
-
-       c.Check(err, NotNil)
-       c.Check(strings.Contains(err.Error(), "unsupported mount kind 'tmp' for stdout"), Equals, true)
+}`, func() {})
+       c.Check(err, ErrorMatches, ".*unsupported mount kind 'tmp' for stdout.*")
 }
 
 func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
@@ -1660,18 +1461,14 @@ func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} },
     "output_path": "/tmp",
     "state": "Locked"
-}`, func(t *TestDockerClient) {})
-
-       c.Check(err, NotNil)
-       c.Check(strings.Contains(err.Error(), "unsupported mount kind 'collection' for stdout"), Equals, true)
+}`, func() {})
+       c.Check(err, ErrorMatches, ".*unsupported mount kind 'collection' for stdout.*")
 }
 
 func (s *TestSuite) TestFullRunWithAPI(c *C) {
-       defer os.Setenv("ARVADOS_API_HOST", os.Getenv("ARVADOS_API_HOST"))
-       os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
-       api, _, _ := s.fullRunHelper(c, `{
-    "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+       s.fullRunHelper(c, `{
+    "command": ["/bin/sh", "-c", "true $ARVADOS_API_HOST"],
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": "/bin",
     "environment": {},
     "mounts": {"/tmp": {"kind": "tmp"} },
@@ -1679,23 +1476,20 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
     "priority": 1,
     "runtime_constraints": {"API": true},
     "state": "Locked"
-}`, nil, 0, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, t.env[1][17:]+"\n"))
-               t.logWriter.Close()
+}`, nil, 0, func() {
+               c.Check(s.executor.created.Env["ARVADOS_API_HOST"], Equals, os.Getenv("ARVADOS_API_HOST"))
+               s.executor.exit <- 3
        })
-
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "test.arvados.org\n"), Equals, true)
-       c.Check(api.CalledWith("container.output", "d41d8cd98f00b204e9800998ecf8427e+0"), NotNil)
+       c.Check(s.api.CalledWith("container.exit_code", 3), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
 }
 
 func (s *TestSuite) TestFullRunSetOutput(c *C) {
        defer os.Setenv("ARVADOS_API_HOST", os.Getenv("ARVADOS_API_HOST"))
        os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
-       api, _, _ := s.fullRunHelper(c, `{
+       s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": "/bin",
     "environment": {},
     "mounts": {"/tmp": {"kind": "tmp"} },
@@ -1703,20 +1497,19 @@ func (s *TestSuite) TestFullRunSetOutput(c *C) {
     "priority": 1,
     "runtime_constraints": {"API": true},
     "state": "Locked"
-}`, nil, 0, func(t *TestDockerClient) {
-               t.api.Container.Output = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
-               t.logWriter.Close()
+}`, nil, 0, func() {
+               s.api.Container.Output = arvadostest.DockerImage112PDH
        })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(api.CalledWith("container.output", "d4ab34d3d4f8a72f5c4973051ae69fab+122"), NotNil)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(s.api.CalledWith("container.output", arvadostest.DockerImage112PDH), NotNil)
 }
 
 func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C) {
        helperRecord := `{
                "command": ["/bin/sh", "-c", "echo $FROBIZ"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "` + arvadostest.DockerImage112PDH + `",
                "cwd": "/bin",
                "environment": {"FROBIZ": "bilbo"},
                "mounts": {
@@ -1735,20 +1528,19 @@ func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C
 
        extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
 
-       api, cr, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
-               t.logWriter.Close()
+       s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+               fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
        })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
 }
 
 func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
        helperRecord := `{
                "command": ["/bin/sh", "-c", "echo $FROBIZ"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "` + arvadostest.DockerImage112PDH + `",
                "cwd": "/bin",
                "environment": {"FROBIZ": "bilbo"},
                "mounts": {
@@ -1771,16 +1563,16 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
                "a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
        }
 
-       api, runner, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
-               t.logWriter.Close()
+       api, _, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+               fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
        })
 
-       c.Check(runner.Binds, DeepEquals, []string{realtemp + "/tmp2:/tmp",
-               realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/file2_in_main.txt:/tmp/foo/bar:ro",
-               realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt:/tmp/foo/baz/sub2file2:ro",
-               realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1:/tmp/foo/sub1:ro",
-               realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt:/tmp/foo/sub1file2:ro",
+       c.Check(s.executor.created.BindMounts, DeepEquals, map[string]bindmount{
+               "/tmp":                   {realtemp + "/tmp1", false},
+               "/tmp/foo/bar":           {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/file2_in_main.txt", true},
+               "/tmp/foo/baz/sub2file2": {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt", true},
+               "/tmp/foo/sub1":          {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1", true},
+               "/tmp/foo/sub1file2":     {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt", true},
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -1806,7 +1598,7 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
 func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(c *C) {
        helperRecord := `{
                "command": ["/bin/sh", "-c", "echo $FROBIZ"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "` + arvadostest.DockerImage112PDH + `",
                "cwd": "/bin",
                "environment": {"FROBIZ": "bilbo"},
                "mounts": {
@@ -1824,14 +1616,13 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
                "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
        }
 
-       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
-               t.logWriter.Close()
+       s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+               fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
        })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       for _, v := range api.Content {
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+       for _, v := range s.api.Content {
                if v["collection"] != nil {
                        collection := v["collection"].(arvadosclient.Dict)
                        if strings.Index(collection["name"].(string), "output") == 0 {
@@ -1848,7 +1639,7 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
 func (s *TestSuite) TestOutputError(c *C) {
        helperRecord := `{
                "command": ["/bin/sh", "-c", "echo $FROBIZ"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "` + arvadostest.DockerImage112PDH + `",
                "cwd": "/bin",
                "environment": {"FROBIZ": "bilbo"},
                "mounts": {
@@ -1859,21 +1650,17 @@ func (s *TestSuite) TestOutputError(c *C) {
                "runtime_constraints": {},
                "state": "Locked"
        }`
-
-       extraMounts := []string{}
-
-       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
-               os.Symlink("/etc/hosts", t.realTemp+"/tmp2/baz")
-               t.logWriter.Close()
+       s.fullRunHelper(c, helperRecord, nil, 0, func() {
+               os.Symlink("/etc/hosts", s.runner.HostOutputDir+"/baz")
        })
 
-       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
 }
 
 func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
        helperRecord := `{
                "command": ["/bin/sh", "-c", "echo $FROBIZ"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "` + arvadostest.DockerImage112PDH + `",
                "cwd": "/bin",
                "environment": {"FROBIZ": "bilbo"},
                "mounts": {
@@ -1891,9 +1678,8 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
                "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
        }
 
-       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
-               t.logWriter.Close()
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+               fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -1913,7 +1699,7 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
 func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
        helperRecord := `{
                "command": ["/bin/sh", "-c", "echo $FROBIZ"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "` + arvadostest.DockerImage112PDH + `",
                "cwd": "/bin",
                "environment": {"FROBIZ": "bilbo"},
                "mounts": {
@@ -1927,9 +1713,8 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
                "state": "Locked"
        }`
 
-       api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
-               t.logWriter.Close()
+       api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func() {
+               fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -1949,7 +1734,7 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
 func (s *TestSuite) TestStderrMount(c *C) {
        api, cr, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo hello;exit 1"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": ".",
     "environment": {},
     "mounts": {"/tmp": {"kind": "tmp"},
@@ -1959,10 +1744,9 @@ func (s *TestSuite) TestStderrMount(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 1, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, "hello\n"))
-               t.logWriter.Write(dockerLog(2, "oops\n"))
-               t.logWriter.Close()
+}`, nil, 1, func() {
+               fmt.Fprintln(s.executor.created.Stdout, "hello")
+               fmt.Fprintln(s.executor.created.Stderr, "oops")
        })
 
        final := api.CalledWith("container.state", "Complete")
@@ -1974,131 +1758,37 @@ func (s *TestSuite) TestStderrMount(c *C) {
 }
 
 func (s *TestSuite) TestNumberRoundTrip(c *C) {
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       s.api.callraw = true
+       err := s.runner.fetchContainerRecord()
        c.Assert(err, IsNil)
-       cr.fetchContainerRecord()
-
-       jsondata, err := json.Marshal(cr.Container.Mounts["/json"].Content)
-
+       jsondata, err := json.Marshal(s.runner.Container.Mounts["/json"].Content)
+       c.Logf("%#v", s.runner.Container)
        c.Check(err, IsNil)
        c.Check(string(jsondata), Equals, `{"number":123456789123456789}`)
 }
 
-func (s *TestSuite) TestFullBrokenDocker1(c *C) {
-       tf, err := ioutil.TempFile("", "brokenNodeHook-")
-       c.Assert(err, IsNil)
-       defer os.Remove(tf.Name())
-
-       tf.Write([]byte(`#!/bin/sh
-exec echo killme
-`))
-       tf.Close()
-       os.Chmod(tf.Name(), 0700)
-
-       ech := tf.Name()
-       brokenNodeHook = &ech
-
-       api, _, _ := s.fullRunHelper(c, `{
-    "command": ["echo", "hello world"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
-    "cwd": ".",
-    "environment": {},
-    "mounts": {"/tmp": {"kind": "tmp"} },
-    "output_path": "/tmp",
-    "priority": 1,
-    "runtime_constraints": {},
-    "state": "Locked"
-}`, nil, 2, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, "hello world\n"))
-               t.logWriter.Close()
-       })
-
-       c.Check(api.CalledWith("container.state", "Queued"), NotNil)
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
-
-}
-
-func (s *TestSuite) TestFullBrokenDocker2(c *C) {
-       ech := ""
-       brokenNodeHook = &ech
-
-       api, _, _ := s.fullRunHelper(c, `{
-    "command": ["echo", "hello world"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
-    "cwd": ".",
-    "environment": {},
-    "mounts": {"/tmp": {"kind": "tmp"} },
-    "output_path": "/tmp",
-    "priority": 1,
-    "runtime_constraints": {},
-    "state": "Locked"
-}`, nil, 2, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, "hello world\n"))
-               t.logWriter.Close()
-       })
-
-       c.Check(api.CalledWith("container.state", "Queued"), NotNil)
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
-}
-
-func (s *TestSuite) TestFullBrokenDocker3(c *C) {
-       ech := ""
-       brokenNodeHook = &ech
-
-       api, _, _ := s.fullRunHelper(c, `{
-    "command": ["echo", "hello world"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
-    "cwd": ".",
-    "environment": {},
-    "mounts": {"/tmp": {"kind": "tmp"} },
-    "output_path": "/tmp",
-    "priority": 1,
-    "runtime_constraints": {},
-    "state": "Locked"
-}`, nil, 3, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, "hello world\n"))
-               t.logWriter.Close()
-       })
-
-       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
-}
-
-func (s *TestSuite) TestBadCommand1(c *C) {
-       ech := ""
-       brokenNodeHook = &ech
-
-       api, _, _ := s.fullRunHelper(c, `{
-    "command": ["echo", "hello world"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
-    "cwd": ".",
-    "environment": {},
-    "mounts": {"/tmp": {"kind": "tmp"} },
-    "output_path": "/tmp",
-    "priority": 1,
-    "runtime_constraints": {},
-    "state": "Locked"
-}`, nil, 4, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, "hello world\n"))
-               t.logWriter.Close()
-       })
-
-       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
-}
-
-func (s *TestSuite) TestBadCommand2(c *C) {
-       ech := ""
-       brokenNodeHook = &ech
-
-       api, _, _ := s.fullRunHelper(c, `{
+func (s *TestSuite) TestFullBrokenDocker(c *C) {
+       nextState := ""
+       for _, setup := range []func(){
+               func() {
+                       c.Log("// waitErr = ocl runtime error")
+                       s.executor.waitErr = errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`)
+                       nextState = "Cancelled"
+               },
+               func() {
+                       c.Log("// loadErr = cannot connect")
+                       s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+                       *brokenNodeHook = c.MkDir() + "/broken-node-hook"
+                       err := ioutil.WriteFile(*brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
+                       c.Assert(err, IsNil)
+                       nextState = "Queued"
+               },
+       } {
+               s.SetUpTest(c)
+               setup()
+               s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": ".",
     "environment": {},
     "mounts": {"/tmp": {"kind": "tmp"} },
@@ -2106,22 +1796,30 @@ func (s *TestSuite) TestBadCommand2(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 5, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, "hello world\n"))
-               t.logWriter.Close()
-       })
-
-       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+}`, nil, 0, func() {})
+               c.Check(s.api.CalledWith("container.state", nextState), NotNil)
+               c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+               if *brokenNodeHook != "" {
+                       c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
+                       c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
+                       c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
+               } else {
+                       c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
+               }
+       }
 }
 
-func (s *TestSuite) TestBadCommand3(c *C) {
-       ech := ""
-       brokenNodeHook = &ech
-
-       api, _, _ := s.fullRunHelper(c, `{
+func (s *TestSuite) TestBadCommand(c *C) {
+       for _, startError := range []string{
+               `panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`,
+               `Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`,
+               `Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`,
+       } {
+               s.SetUpTest(c)
+               s.executor.startErr = errors.New(startError)
+               s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
-    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
     "cwd": ".",
     "environment": {},
     "mounts": {"/tmp": {"kind": "tmp"} },
@@ -2129,20 +1827,16 @@ func (s *TestSuite) TestBadCommand3(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 6, func(t *TestDockerClient) {
-               t.logWriter.Write(dockerLog(1, "hello world\n"))
-               t.logWriter.Close()
-       })
-
-       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+}`, nil, 0, func() {})
+               c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
+               c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+       }
 }
 
 func (s *TestSuite) TestSecretTextMountPoint(c *C) {
-       // under normal mounts, gets captured in output, oops
        helperRecord := `{
                "command": ["true"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "` + arvadostest.DockerImage112PDH + `",
                "cwd": "/bin",
                "mounts": {
                     "/tmp": {"kind": "tmp"},
@@ -2156,22 +1850,21 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
                "state": "Locked"
        }`
 
-       api, cr, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
-               content, err := ioutil.ReadFile(t.realTemp + "/tmp2/secret.conf")
+       s.fullRunHelper(c, helperRecord, nil, 0, func() {
+               content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf")
                c.Check(err, IsNil)
-               c.Check(content, DeepEquals, []byte("mypassword"))
-               t.logWriter.Close()
+               c.Check(string(content), Equals, "mypassword")
        })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), NotNil)
-       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), IsNil)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), NotNil)
+       c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), IsNil)
 
        // under secret mounts, not captured in output
        helperRecord = `{
                "command": ["true"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "container_image": "` + arvadostest.DockerImage112PDH + `",
                "cwd": "/bin",
                "mounts": {
                     "/tmp": {"kind": "tmp"}
@@ -2185,17 +1878,17 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
                "state": "Locked"
        }`
 
-       api, cr, _ = s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
-               content, err := ioutil.ReadFile(t.realTemp + "/tmp2/secret.conf")
+       s.SetUpTest(c)
+       s.fullRunHelper(c, helperRecord, nil, 0, func() {
+               content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf")
                c.Check(err, IsNil)
-               c.Check(content, DeepEquals, []byte("mypassword"))
-               t.logWriter.Close()
+               c.Check(string(content), Equals, "mypassword")
        })
 
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), IsNil)
-       c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), NotNil)
+       c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), IsNil)
+       c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), NotNil)
 }
 
 type FakeProcess struct {
diff --git a/lib/crunchrun/docker.go b/lib/crunchrun/docker.go
new file mode 100644 (file)
index 0000000..32c60e6
--- /dev/null
@@ -0,0 +1,263 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+package crunchrun
+
+import (
+       "fmt"
+       "io"
+       "io/ioutil"
+       "os"
+       "strings"
+       "time"
+
+       dockertypes "github.com/docker/docker/api/types"
+       dockercontainer "github.com/docker/docker/api/types/container"
+       dockerclient "github.com/docker/docker/client"
+       "golang.org/x/net/context"
+)
+
+// Docker daemon won't let you set a limit less than ~10 MiB
+const minDockerRAM = int64(16 * 1024 * 1024)
+
+type dockerExecutor struct {
+       containerUUID    string
+       logf             func(string, ...interface{})
+       watchdogInterval time.Duration
+       dockerclient     *dockerclient.Client
+       containerID      string
+       doneIO           chan struct{}
+       errIO            error
+}
+
+func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) {
+       // API version 1.21 corresponds to Docker 1.9, which is
+       // currently the minimum version we want to support.
+       client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
+       if watchdogInterval < 1 {
+               watchdogInterval = time.Minute
+       }
+       return &dockerExecutor{
+               containerUUID:    containerUUID,
+               logf:             logf,
+               watchdogInterval: watchdogInterval,
+               dockerclient:     client,
+       }, err
+}
+
+func (e *dockerExecutor) ImageLoaded(imageID string) bool {
+       _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID)
+       return err == nil
+}
+
+func (e *dockerExecutor) LoadImage(filename string) error {
+       f, err := os.Open(filename)
+       if err != nil {
+               return err
+       }
+       defer f.Close()
+       resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true)
+       if err != nil {
+               return fmt.Errorf("While loading container image into Docker: %v", err)
+       }
+       defer resp.Body.Close()
+       buf, _ := ioutil.ReadAll(resp.Body)
+       e.logf("loaded image: response %s", buf)
+       return nil
+}
+
+func (e *dockerExecutor) Create(spec containerSpec) error {
+       e.logf("Creating Docker container")
+       cfg := dockercontainer.Config{
+               Image:        spec.Image,
+               Cmd:          spec.Command,
+               WorkingDir:   spec.WorkingDir,
+               Volumes:      map[string]struct{}{},
+               OpenStdin:    spec.Stdin != nil,
+               StdinOnce:    spec.Stdin != nil,
+               AttachStdin:  spec.Stdin != nil,
+               AttachStdout: true,
+               AttachStderr: true,
+       }
+       if cfg.WorkingDir == "." {
+               cfg.WorkingDir = ""
+       }
+       for k, v := range spec.Env {
+               cfg.Env = append(cfg.Env, k+"="+v)
+       }
+       if spec.RAM < minDockerRAM {
+               spec.RAM = minDockerRAM
+       }
+       hostCfg := dockercontainer.HostConfig{
+               LogConfig: dockercontainer.LogConfig{
+                       Type: "none",
+               },
+               NetworkMode: dockercontainer.NetworkMode("none"),
+               Resources: dockercontainer.Resources{
+                       CgroupParent: spec.CgroupParent,
+                       NanoCPUs:     int64(spec.VCPUs) * 1000000000,
+                       Memory:       spec.RAM, // RAM
+                       MemorySwap:   spec.RAM, // RAM+swap
+                       KernelMemory: spec.RAM, // kernel portion
+               },
+       }
+       for path, mount := range spec.BindMounts {
+               bind := mount.HostPath + ":" + path
+               if mount.ReadOnly {
+                       bind += ":ro"
+               }
+               hostCfg.Binds = append(hostCfg.Binds, bind)
+       }
+       if spec.EnableNetwork {
+               hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode)
+       }
+
+       created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, e.containerUUID)
+       if err != nil {
+               return fmt.Errorf("While creating container: %v", err)
+       }
+       e.containerID = created.ID
+       return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr)
+}
+
+func (e *dockerExecutor) CgroupID() string {
+       return e.containerID
+}
+
+func (e *dockerExecutor) Start() error {
+       return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockertypes.ContainerStartOptions{})
+}
+
+func (e *dockerExecutor) Stop() error {
+       err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
+       if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) {
+               err = nil
+       }
+       return err
+}
+
+// Wait for the container to terminate, capture the exit code, and
+// wait for stdout/stderr logging to finish.
+func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+       watchdogErr := make(chan error, 1)
+       go func() {
+               ticker := time.NewTicker(e.watchdogInterval)
+               defer ticker.Stop()
+               for range ticker.C {
+                       dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval))
+                       ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID)
+                       dcancel()
+                       if ctx.Err() != nil {
+                               // Either the container already
+                               // exited, or our caller is trying to
+                               // kill it.
+                               return
+                       } else if err != nil {
+                               e.logf("Error inspecting container: %s", err)
+                               watchdogErr <- err
+                               return
+                       } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
+                               watchdogErr <- fmt.Errorf("Container is not running: State=%v", ctr.State)
+                               return
+                       }
+               }
+       }()
+
+       waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning)
+       for {
+               select {
+               case waitBody := <-waitOk:
+                       e.logf("Container exited with code: %v", waitBody.StatusCode)
+                       // wait for stdout/stderr to complete
+                       <-e.doneIO
+                       return int(waitBody.StatusCode), nil
+
+               case err := <-waitErr:
+                       return -1, fmt.Errorf("container wait: %v", err)
+
+               case <-ctx.Done():
+                       return -1, ctx.Err()
+
+               case err := <-watchdogErr:
+                       return -1, err
+               }
+       }
+}
+
+func (e *dockerExecutor) startIO(stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
+       resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
+               Stream: true,
+               Stdin:  stdin != nil,
+               Stdout: true,
+               Stderr: true,
+       })
+       if err != nil {
+               return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err)
+       }
+       var errStdin error
+       if stdin != nil {
+               go func() {
+                       errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite)
+               }()
+       }
+       e.doneIO = make(chan struct{})
+       go func() {
+               e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader)
+               if e.errIO == nil && errStdin != nil {
+                       e.errIO = errStdin
+               }
+               close(e.doneIO)
+       }()
+       return nil
+}
+
+func (e *dockerExecutor) handleStdin(stdin io.ReadCloser, conn io.Writer, closeConn func() error) error {
+       defer stdin.Close()
+       defer closeConn()
+       _, err := io.Copy(conn, stdin)
+       if err != nil {
+               return fmt.Errorf("While writing to docker container on stdin: %v", err)
+       }
+       return nil
+}
+
+// Handle docker log protocol; see
+// https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
+func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.WriteCloser, reader io.Reader) error {
+       header := make([]byte, 8)
+       var err error
+       for err == nil {
+               _, err = io.ReadAtLeast(reader, header, 8)
+               if err != nil {
+                       if err == io.EOF {
+                               err = nil
+                       }
+                       break
+               }
+               readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
+               if header[0] == 1 {
+                       _, err = io.CopyN(stdout, reader, readsize)
+               } else {
+                       // stderr
+                       _, err = io.CopyN(stderr, reader, readsize)
+               }
+       }
+       if err != nil {
+               return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
+       }
+       err = stdout.Close()
+       if err != nil {
+               return fmt.Errorf("error writing stdout: close: %v", err)
+       }
+       err = stderr.Close()
+       if err != nil {
+               return fmt.Errorf("error writing stderr: close: %v", err)
+       }
+       return nil
+}
+
+func (e *dockerExecutor) Close() {
+       e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true})
+}
diff --git a/lib/crunchrun/docker_test.go b/lib/crunchrun/docker_test.go
new file mode 100644 (file)
index 0000000..28eb595
--- /dev/null
@@ -0,0 +1,31 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       "os/exec"
+       "time"
+
+       . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&dockerSuite{})
+
+type dockerSuite struct {
+       executorSuite
+}
+
+func (s *dockerSuite) SetUpSuite(c *C) {
+       _, err := exec.LookPath("docker")
+       if err != nil {
+               c.Skip("looks like docker is not installed")
+       }
+       s.newExecutor = func(c *C) {
+               exec.Command("docker", "rm", "zzzzz-zzzzz-zzzzzzzzzzzzzzz").Run()
+               var err error
+               s.executor, err = newDockerExecutor("zzzzz-zzzzz-zzzzzzzzzzzzzzz", c.Logf, time.Second/2)
+               c.Assert(err, IsNil)
+       }
+}
diff --git a/lib/crunchrun/executor.go b/lib/crunchrun/executor.go
new file mode 100644 (file)
index 0000000..c773feb
--- /dev/null
@@ -0,0 +1,63 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+package crunchrun
+
+import (
+       "io"
+
+       "golang.org/x/net/context"
+)
+
+type bindmount struct {
+       HostPath string
+       ReadOnly bool
+}
+
+type containerSpec struct {
+       Image         string
+       VCPUs         int
+       RAM           int64
+       WorkingDir    string
+       Env           map[string]string
+       BindMounts    map[string]bindmount
+       Command       []string
+       EnableNetwork bool
+       NetworkMode   string // docker network mode, normally "default"
+       CgroupParent  string
+       Stdin         io.ReadCloser
+       Stdout        io.WriteCloser
+       Stderr        io.WriteCloser
+}
+
+// containerExecutor is an interface to a container runtime
+// (docker/singularity).
+type containerExecutor interface {
+       // ImageLoaded determines whether the given image is already
+       // available to use without calling ImageLoad.
+       ImageLoaded(imageID string) bool
+
+       // ImageLoad loads the image from the given tarball such that
+       // it can be used to create/start a container.
+       LoadImage(filename string) error
+
+       // Wait for the container process to finish, and return its
+       // exit code. If applicable, also remove the stopped container
+       // before returning.
+       Wait(context.Context) (int, error)
+
+       // Create a container, but don't start it yet.
+       Create(containerSpec) error
+
+       // Start the container
+       Start() error
+
+       // CID the container will belong to
+       CgroupID() string
+
+       // Stop the container immediately
+       Stop() error
+
+       // Release resources (temp dirs, stopped containers)
+       Close()
+}
diff --git a/lib/crunchrun/executor_test.go b/lib/crunchrun/executor_test.go
new file mode 100644 (file)
index 0000000..4b6a4b1
--- /dev/null
@@ -0,0 +1,159 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       "bytes"
+       "io"
+       "io/ioutil"
+       "net/http"
+       "os"
+       "strings"
+       "time"
+
+       "golang.org/x/net/context"
+       . "gopkg.in/check.v1"
+)
+
+func busyboxDockerImage(c *C) string {
+       fnm := "busybox_uclibc.tar"
+       cachedir := c.MkDir()
+       cachefile := cachedir + "/" + fnm
+       if _, err := os.Stat(cachefile); err == nil {
+               return cachefile
+       }
+
+       f, err := ioutil.TempFile(cachedir, "")
+       c.Assert(err, IsNil)
+       defer f.Close()
+       defer os.Remove(f.Name())
+
+       resp, err := http.Get("https://cache.arvados.org/" + fnm)
+       c.Assert(err, IsNil)
+       defer resp.Body.Close()
+       _, err = io.Copy(f, resp.Body)
+       c.Assert(err, IsNil)
+       err = f.Close()
+       c.Assert(err, IsNil)
+       err = os.Rename(f.Name(), cachefile)
+       c.Assert(err, IsNil)
+
+       return cachefile
+}
+
+type nopWriteCloser struct{ io.Writer }
+
+func (nopWriteCloser) Close() error { return nil }
+
+// embedded by dockerSuite and singularitySuite so they can share
+// tests.
+type executorSuite struct {
+       newExecutor func(*C) // embedding struct's SetUpSuite method must set this
+       executor    containerExecutor
+       spec        containerSpec
+       stdout      bytes.Buffer
+       stderr      bytes.Buffer
+}
+
+func (s *executorSuite) SetUpTest(c *C) {
+       s.newExecutor(c)
+       s.stdout = bytes.Buffer{}
+       s.stderr = bytes.Buffer{}
+       s.spec = containerSpec{
+               Image:       "busybox:uclibc",
+               VCPUs:       1,
+               WorkingDir:  "",
+               Env:         map[string]string{"PATH": "/bin:/usr/bin"},
+               NetworkMode: "default",
+               Stdout:      nopWriteCloser{&s.stdout},
+               Stderr:      nopWriteCloser{&s.stderr},
+       }
+       err := s.executor.LoadImage(busyboxDockerImage(c))
+       c.Assert(err, IsNil)
+}
+
+func (s *executorSuite) TearDownTest(c *C) {
+       s.executor.Close()
+}
+
+func (s *executorSuite) TestExecTrivialContainer(c *C) {
+       s.spec.Command = []string{"echo", "ok"}
+       s.checkRun(c, 0)
+       c.Check(s.stdout.String(), Equals, "ok\n")
+       c.Check(s.stderr.String(), Equals, "")
+}
+
+func (s *executorSuite) TestExecStop(c *C) {
+       s.spec.Command = []string{"sh", "-c", "sleep 10; echo ok"}
+       err := s.executor.Create(s.spec)
+       c.Assert(err, IsNil)
+       err = s.executor.Start()
+       c.Assert(err, IsNil)
+       go func() {
+               time.Sleep(time.Second / 10)
+               s.executor.Stop()
+       }()
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
+       defer cancel()
+       code, err := s.executor.Wait(ctx)
+       c.Check(code, Not(Equals), 0)
+       c.Check(err, IsNil)
+       c.Check(s.stdout.String(), Equals, "")
+       c.Check(s.stderr.String(), Equals, "")
+}
+
+func (s *executorSuite) TestExecCleanEnv(c *C) {
+       s.spec.Command = []string{"env"}
+       s.checkRun(c, 0)
+       c.Check(s.stderr.String(), Equals, "")
+       got := map[string]string{}
+       for _, kv := range strings.Split(s.stdout.String(), "\n") {
+               if kv == "" {
+                       continue
+               }
+               kv := strings.SplitN(kv, "=", 2)
+               switch kv[0] {
+               case "HOSTNAME", "HOME":
+                       // docker sets these by itself
+               case "LD_LIBRARY_PATH", "SINGULARITY_NAME", "PWD", "LANG", "SHLVL", "SINGULARITY_INIT", "SINGULARITY_CONTAINER":
+                       // singularity sets these by itself (cf. https://sylabs.io/guides/3.5/user-guide/environment_and_metadata.html)
+               case "PROMPT_COMMAND", "PS1", "SINGULARITY_APPNAME":
+                       // singularity also sets these by itself (as of v3.5.2)
+               default:
+                       got[kv[0]] = kv[1]
+               }
+       }
+       c.Check(got, DeepEquals, s.spec.Env)
+}
+func (s *executorSuite) TestExecEnableNetwork(c *C) {
+       for _, enable := range []bool{false, true} {
+               s.SetUpTest(c)
+               s.spec.Command = []string{"ip", "route"}
+               s.spec.EnableNetwork = enable
+               s.checkRun(c, 0)
+               if enable {
+                       c.Check(s.stdout.String(), Matches, "(?ms).*default via.*")
+               } else {
+                       c.Check(s.stdout.String(), Equals, "")
+               }
+       }
+}
+
+func (s *executorSuite) TestExecStdoutStderr(c *C) {
+       s.spec.Command = []string{"sh", "-c", "echo foo; echo -n bar >&2; echo baz; echo waz >&2"}
+       s.checkRun(c, 0)
+       c.Check(s.stdout.String(), Equals, "foo\nbaz\n")
+       c.Check(s.stderr.String(), Equals, "barwaz\n")
+}
+
+func (s *executorSuite) checkRun(c *C, expectCode int) {
+       c.Assert(s.executor.Create(s.spec), IsNil)
+       c.Assert(s.executor.Start(), IsNil)
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
+       defer cancel()
+       code, err := s.executor.Wait(ctx)
+       c.Assert(err, IsNil)
+       c.Check(code, Equals, expectCode)
+}
diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go
new file mode 100644 (file)
index 0000000..50136e5
--- /dev/null
@@ -0,0 +1,217 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       "bytes"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "os"
+       "os/exec"
+       "strings"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&integrationSuite{})
+
+type integrationSuite struct {
+       engine string
+       image  arvados.Collection
+       input  arvados.Collection
+       stdin  bytes.Buffer
+       stdout bytes.Buffer
+       stderr bytes.Buffer
+       cr     arvados.ContainerRequest
+       client *arvados.Client
+       ac     *arvadosclient.ArvadosClient
+       kc     *keepclient.KeepClient
+}
+
+func (s *integrationSuite) SetUpSuite(c *C) {
+       _, err := exec.LookPath("docker")
+       if err != nil {
+               c.Skip("looks like docker is not installed")
+       }
+
+       arvadostest.StartKeep(2, true)
+
+       out, err := exec.Command("docker", "load", "--input", busyboxDockerImage(c)).CombinedOutput()
+       c.Log(string(out))
+       c.Assert(err, IsNil)
+       out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
+       imageUUID := strings.TrimSpace(string(out))
+       c.Logf("image uuid %s", imageUUID)
+       c.Assert(err, IsNil)
+       err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil)
+       c.Assert(err, IsNil)
+       c.Logf("image pdh %s", s.image.PortableDataHash)
+
+       s.client = arvados.NewClientFromEnv()
+       s.ac, err = arvadosclient.New(s.client)
+       c.Assert(err, IsNil)
+       s.kc = keepclient.New(s.ac)
+       fs, err := s.input.FileSystem(s.client, s.kc)
+       c.Assert(err, IsNil)
+       f, err := fs.OpenFile("inputfile", os.O_CREATE|os.O_WRONLY, 0755)
+       c.Assert(err, IsNil)
+       _, err = f.Write([]byte("inputdata"))
+       c.Assert(err, IsNil)
+       err = f.Close()
+       c.Assert(err, IsNil)
+       s.input.ManifestText, err = fs.MarshalManifest(".")
+       c.Assert(err, IsNil)
+       err = s.client.RequestAndDecode(&s.input, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+               "ensure_unique_name": true,
+               "collection": map[string]interface{}{
+                       "manifest_text": s.input.ManifestText,
+               },
+       })
+       c.Assert(err, IsNil)
+       c.Logf("input pdh %s", s.input.PortableDataHash)
+}
+
+func (s *integrationSuite) TearDownSuite(c *C) {
+       err := s.client.RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+       c.Check(err, IsNil)
+}
+
+func (s *integrationSuite) SetUpTest(c *C) {
+       s.engine = "docker"
+       s.stdin = bytes.Buffer{}
+       s.stdout = bytes.Buffer{}
+       s.stderr = bytes.Buffer{}
+       s.cr = arvados.ContainerRequest{
+               Priority:       1,
+               State:          "Committed",
+               OutputPath:     "/mnt/out",
+               ContainerImage: s.image.PortableDataHash,
+               Mounts: map[string]arvados.Mount{
+                       "/mnt/json": {
+                               Kind: "json",
+                               Content: []interface{}{
+                                       "foo",
+                                       map[string]string{"foo": "bar"},
+                                       nil,
+                               },
+                       },
+                       "/mnt/in": {
+                               Kind:             "collection",
+                               PortableDataHash: s.input.PortableDataHash,
+                       },
+                       "/mnt/out": {
+                               Kind:     "tmp",
+                               Capacity: 1000,
+                       },
+               },
+               RuntimeConstraints: arvados.RuntimeConstraints{
+                       RAM:   128000000,
+                       VCPUs: 1,
+                       API:   true,
+               },
+       }
+}
+
+func (s *integrationSuite) setup(c *C) {
+       err := s.client.RequestAndDecode(&s.cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{"container_request": map[string]interface{}{
+               "priority":            s.cr.Priority,
+               "state":               s.cr.State,
+               "command":             s.cr.Command,
+               "output_path":         s.cr.OutputPath,
+               "container_image":     s.cr.ContainerImage,
+               "mounts":              s.cr.Mounts,
+               "runtime_constraints": s.cr.RuntimeConstraints,
+               "use_existing":        false,
+       }})
+       c.Assert(err, IsNil)
+       c.Assert(s.cr.ContainerUUID, Not(Equals), "")
+       err = s.client.RequestAndDecode(nil, "POST", "arvados/v1/containers/"+s.cr.ContainerUUID+"/lock", nil, nil)
+       c.Assert(err, IsNil)
+}
+
+func (s *integrationSuite) TestRunTrivialContainerWithDocker(c *C) {
+       s.engine = "docker"
+       s.testRunTrivialContainer(c)
+}
+
+func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
+       s.engine = "singularity"
+       s.testRunTrivialContainer(c)
+}
+
+func (s *integrationSuite) testRunTrivialContainer(c *C) {
+       if err := exec.Command("which", s.engine).Run(); err != nil {
+               c.Skip(fmt.Sprintf("%s: %s", s.engine, err))
+       }
+       s.cr.Command = []string{"sh", "-c", "cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"}
+       s.setup(c)
+       code := command{}.RunCommand("crunch-run", []string{
+               "-runtime-engine=" + s.engine,
+               "-enable-memory-limit=false",
+               s.cr.ContainerUUID,
+       }, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+       c.Check(code, Equals, 0)
+       err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil)
+       c.Assert(err, IsNil)
+       c.Logf("Finished container request: %#v", s.cr)
+
+       var log arvados.Collection
+       err = s.client.RequestAndDecode(&log, "GET", "arvados/v1/collections/"+s.cr.LogUUID, nil, nil)
+       c.Assert(err, IsNil)
+       fs, err := log.FileSystem(s.client, s.kc)
+       c.Assert(err, IsNil)
+       if d, err := fs.Open("/"); c.Check(err, IsNil) {
+               fis, err := d.Readdir(-1)
+               c.Assert(err, IsNil)
+               for _, fi := range fis {
+                       if fi.IsDir() {
+                               continue
+                       }
+                       f, err := fs.Open(fi.Name())
+                       c.Assert(err, IsNil)
+                       buf, err := ioutil.ReadAll(f)
+                       c.Assert(err, IsNil)
+                       c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
+               }
+       }
+
+       var output arvados.Collection
+       err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil)
+       c.Assert(err, IsNil)
+       fs, err = output.FileSystem(s.client, s.kc)
+       c.Assert(err, IsNil)
+       if f, err := fs.Open("inputfile"); c.Check(err, IsNil) {
+               defer f.Close()
+               buf, err := ioutil.ReadAll(f)
+               c.Check(err, IsNil)
+               c.Check(string(buf), Equals, "inputdata")
+       }
+       if f, err := fs.Open("json"); c.Check(err, IsNil) {
+               defer f.Close()
+               buf, err := ioutil.ReadAll(f)
+               c.Check(err, IsNil)
+               c.Check(string(buf), Equals, `["foo",{"foo":"bar"},null]`)
+       }
+       if fi, err := fs.Stat("emptydir"); c.Check(err, IsNil) {
+               c.Check(fi.IsDir(), Equals, true)
+       }
+       if d, err := fs.Open("emptydir"); c.Check(err, IsNil) {
+               defer d.Close()
+               fis, err := d.Readdir(-1)
+               c.Assert(err, IsNil)
+               // crunch-run still saves a ".keep" file to preserve
+               // empty dirs even though that shouldn't be
+               // necessary. Ideally we would do:
+               // c.Check(fis, HasLen, 0)
+               for _, fi := range fis {
+                       c.Check(fi.Name(), Equals, ".keep")
+               }
+       }
+}
index e3fa3af0bb275279c0d3e5c234da1618b63b40ee..55460af379b3338423d9692e9a12dacc103a0591 100644 (file)
@@ -45,7 +45,7 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
 
@@ -74,7 +74,7 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
        cr.CrunchLog.Immediate = nil
@@ -97,7 +97,7 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        ts := &TestTimestamper{}
        cr.CrunchLog.Timestamper = ts.Timestamp
@@ -146,7 +146,7 @@ func (s *LoggingTestSuite) TestLogUpdate(c *C) {
                api := &ArvTestClient{}
                kc := &KeepTestClient{}
                defer kc.Close()
-               cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+               cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
                c.Assert(err, IsNil)
                ts := &TestTimestamper{}
                cr.CrunchLog.Timestamper = ts.Timestamp
@@ -197,7 +197,7 @@ func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
 
diff --git a/lib/crunchrun/singularity.go b/lib/crunchrun/singularity.go
new file mode 100644 (file)
index 0000000..4bec8c3
--- /dev/null
@@ -0,0 +1,148 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       "io/ioutil"
+       "os"
+       "os/exec"
+       "syscall"
+
+       "golang.org/x/net/context"
+)
+
+type singularityExecutor struct {
+       logf          func(string, ...interface{})
+       spec          containerSpec
+       tmpdir        string
+       child         *exec.Cmd
+       imageFilename string // "sif" image
+}
+
+func newSingularityExecutor(logf func(string, ...interface{})) (*singularityExecutor, error) {
+       tmpdir, err := ioutil.TempDir("", "crunch-run-singularity-")
+       if err != nil {
+               return nil, err
+       }
+       return &singularityExecutor{
+               logf:   logf,
+               tmpdir: tmpdir,
+       }, nil
+}
+
+func (e *singularityExecutor) ImageLoaded(string) bool {
+       return false
+}
+
+// LoadImage will satisfy ContainerExecuter interface transforming
+// containerImage into a sif file for later use.
+func (e *singularityExecutor) LoadImage(imageTarballPath string) error {
+       e.logf("building singularity image")
+       // "singularity build" does not accept a
+       // docker-archive://... filename containing a ":" character,
+       // as in "/path/to/sha256:abcd...1234.tar". Workaround: make a
+       // symlink that doesn't have ":" chars.
+       err := os.Symlink(imageTarballPath, e.tmpdir+"/image.tar")
+       if err != nil {
+               return err
+       }
+       e.imageFilename = e.tmpdir + "/image.sif"
+       build := exec.Command("singularity", "build", e.imageFilename, "docker-archive://"+e.tmpdir+"/image.tar")
+       e.logf("%v", build.Args)
+       out, err := build.CombinedOutput()
+       // INFO:    Starting build...
+       // Getting image source signatures
+       // Copying blob ab15617702de done
+       // Copying config 651e02b8a2 done
+       // Writing manifest to image destination
+       // Storing signatures
+       // 2021/04/22 14:42:14  info unpack layer: sha256:21cbfd3a344c52b197b9fa36091e66d9cbe52232703ff78d44734f85abb7ccd3
+       // INFO:    Creating SIF file...
+       // INFO:    Build complete: arvados-jobs.latest.sif
+       e.logf("%s", out)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func (e *singularityExecutor) Create(spec containerSpec) error {
+       e.spec = spec
+       return nil
+}
+
+func (e *singularityExecutor) Start() error {
+       args := []string{"singularity", "exec", "--containall", "--no-home", "--cleanenv"}
+       if !e.spec.EnableNetwork {
+               args = append(args, "--net", "--network=none")
+       }
+       readonlyflag := map[bool]string{
+               false: "rw",
+               true:  "ro",
+       }
+       for path, mount := range e.spec.BindMounts {
+               args = append(args, "--bind", mount.HostPath+":"+path+":"+readonlyflag[mount.ReadOnly])
+       }
+       args = append(args, e.imageFilename)
+       args = append(args, e.spec.Command...)
+
+       // This is for singularity 3.5.2. There are some behaviors
+       // that will change in singularity 3.6, please see:
+       // https://sylabs.io/guides/3.7/user-guide/environment_and_metadata.html
+       // https://sylabs.io/guides/3.5/user-guide/environment_and_metadata.html
+       env := make([]string, 0, len(e.spec.Env))
+       for k, v := range e.spec.Env {
+               env = append(env, "SINGULARITYENV_"+k+"="+v)
+       }
+
+       path, err := exec.LookPath(args[0])
+       if err != nil {
+               return err
+       }
+       child := &exec.Cmd{
+               Path:   path,
+               Args:   args,
+               Env:    env,
+               Stdin:  e.spec.Stdin,
+               Stdout: e.spec.Stdout,
+               Stderr: e.spec.Stderr,
+       }
+       err = child.Start()
+       if err != nil {
+               return err
+       }
+       e.child = child
+       return nil
+}
+
+func (e *singularityExecutor) CgroupID() string {
+       return ""
+}
+
+func (e *singularityExecutor) Stop() error {
+       if err := e.child.Process.Signal(syscall.Signal(0)); err != nil {
+               // process already exited
+               return nil
+       }
+       return e.child.Process.Signal(syscall.SIGKILL)
+}
+
+func (e *singularityExecutor) Wait(context.Context) (int, error) {
+       err := e.child.Wait()
+       if err, ok := err.(*exec.ExitError); ok {
+               return err.ProcessState.ExitCode(), nil
+       }
+       if err != nil {
+               return 0, err
+       }
+       return e.child.ProcessState.ExitCode(), nil
+}
+
+func (e *singularityExecutor) Close() {
+       err := os.RemoveAll(e.tmpdir)
+       if err != nil {
+               e.logf("error removing temp dir: %s", err)
+       }
+}
diff --git a/lib/crunchrun/singularity_test.go b/lib/crunchrun/singularity_test.go
new file mode 100644 (file)
index 0000000..a1263da
--- /dev/null
@@ -0,0 +1,29 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       "os/exec"
+
+       . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&singularitySuite{})
+
+type singularitySuite struct {
+       executorSuite
+}
+
+func (s *singularitySuite) SetUpSuite(c *C) {
+       _, err := exec.LookPath("singularity")
+       if err != nil {
+               c.Skip("looks like singularity is not installed")
+       }
+       s.newExecutor = func(c *C) {
+               var err error
+               s.executor, err = newSingularityExecutor(c.Logf)
+               c.Assert(err, IsNil)
+       }
+}
index 8752ee054456bf1a2a2fc5b8030e8a7eeaa691b1..829a053636d5dc07abaac1c649810c5416e09fb6 100644 (file)
@@ -56,6 +56,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        CrunchRunArgumentsList: []string{"--foo", "--extra='args'"},
                        DispatchPrivateKey:     string(dispatchprivraw),
                        StaleLockTimeout:       arvados.Duration(5 * time.Millisecond),
+                       RuntimeEngine:          "stub",
                        CloudVMs: arvados.CloudVMsConfig{
                                Driver:               "test",
                                SyncInterval:         arvados.Duration(10 * time.Millisecond),
@@ -163,7 +164,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
                stubvm.ExecuteContainer = executeContainer
                stubvm.CrashRunningContainer = finishContainer
-               stubvm.ExtraCrunchRunArgs = "'--foo' '--extra='\\''args'\\'''"
+               stubvm.ExtraCrunchRunArgs = "'--runtime-engine=stub' '--foo' '--extra='\\''args'\\'''"
                switch n % 7 {
                case 0:
                        stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
index 7289179fd6e4526ecfc7204d970172b42018af59..a5924cf997f7b4bc9d838134054be58b3bd25127 100644 (file)
@@ -122,7 +122,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                installPublicKey:               installPublicKey,
                tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
                runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
-               runnerArgs:                     cluster.Containers.CrunchRunArgumentsList,
+               runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
                stop:                           make(chan bool),
        }
        wp.registerMetrics(reg)
index 8df3fba532a59559fe6ce5e17eacfa5bd41956c3..255e56aaf527592db01a45d41c206469885e0eae 100644 (file)
@@ -181,6 +181,11 @@ func (inst *installCommand) RunCommand(prog string, args []string, stdin io.Read
                        "wget",
                        "xvfb",
                )
+               if dev || test {
+                       pkgs = append(pkgs,
+                               "squashfs-tools", // for singularity
+                       )
+               }
                switch {
                case osv.Debian && osv.Major >= 10:
                        pkgs = append(pkgs, "libcurl4")
@@ -315,6 +320,28 @@ rm ${zip}
                        }
                }
 
+               singularityversion := "3.5.2"
+               if havesingularityversion, err := exec.Command("/var/lib/arvados/bin/singularity", "--version").CombinedOutput(); err == nil && strings.Contains(string(havesingularityversion), singularityversion) {
+                       logger.Print("singularity " + singularityversion + " already installed")
+               } else if dev || test {
+                       err = inst.runBash(`
+S=`+singularityversion+`
+tmp=/var/lib/arvados/tmp/singularity
+trap "rm -r ${tmp}" ERR EXIT
+cd /var/lib/arvados/tmp
+git clone https://github.com/sylabs/singularity
+cd singularity
+git checkout v${S}
+./mconfig --prefix=/var/lib/arvados
+make -C ./builddir
+make -C ./builddir install
+rm -r ${tmp}
+`, stdout, stderr)
+                       if err != nil {
+                               return 1
+                       }
+               }
+
                // The entry in /etc/locale.gen is "en_US.UTF-8"; once
                // it's installed, locale -a reports it as
                // "en_US.utf8".
index 65e2ff5381e84e9ab4259e0b54b3d033e20d9508..b1ee6e9b992f4446ec1f0f826c22e9bdb244704b 100644 (file)
@@ -416,6 +416,7 @@ type ContainersConfig struct {
        StaleLockTimeout            Duration
        SupportedDockerImageFormats StringSet
        UsePreemptibleInstances     bool
+       RuntimeEngine               string
 
        JobsAPI struct {
                Enable         string
index aeb5a47e6d0559df094ee3cbec5432d3b3b8f2ce..16e20981651d035e09ce8d10ab179d3a50fbe891 100644 (file)
@@ -96,6 +96,9 @@ const (
 
        LogCollectionUUID  = "zzzzz-4zz18-logcollection01"
        LogCollectionUUID2 = "zzzzz-4zz18-logcollection02"
+
+       DockerImage112PDH      = "d740a57097711e08eb9b2a93518f20ab+174"
+       DockerImage112Filename = "sha256:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678.tar"
 )
 
 // PathologicalManifest : A valid manifest designed to test