From 83d6cc94a72c1be8a371976ca6198abfb8bfc5a9 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 17 May 2021 17:06:41 -0400 Subject: [PATCH] 17296: Disentangle docker-specific code, add singularity option. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/config/config.default.yml | 3 + lib/config/generated_config.go | 3 + lib/crunchrun/copier.go | 9 +- lib/crunchrun/copier_test.go | 4 +- lib/crunchrun/crunchrun.go | 708 ++++++--------- lib/crunchrun/crunchrun_test.go | 1184 ++++++++++---------------- lib/crunchrun/docker.go | 263 ++++++ lib/crunchrun/docker_test.go | 31 + lib/crunchrun/executor.go | 63 ++ lib/crunchrun/executor_test.go | 161 ++++ lib/crunchrun/logging_test.go | 10 +- lib/crunchrun/singularity.go | 140 +++ lib/crunchrun/singularity_test.go | 29 + lib/dispatchcloud/dispatcher_test.go | 3 +- lib/dispatchcloud/worker/pool.go | 2 +- lib/install/deps.go | 27 + sdk/go/arvados/config.go | 1 + sdk/go/arvadostest/fixtures.go | 3 + 18 files changed, 1436 insertions(+), 1208 deletions(-) create mode 100644 lib/crunchrun/docker.go create mode 100644 lib/crunchrun/docker_test.go create mode 100644 lib/crunchrun/executor.go create mode 100644 lib/crunchrun/executor_test.go create mode 100644 lib/crunchrun/singularity.go create mode 100644 lib/crunchrun/singularity_test.go diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 50a965a9aa..c94b4ee9a1 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -862,6 +862,9 @@ Clusters: # Minimum time between two attempts to run the same container MinRetryPeriod: 0s + # Container runtime: "docker" (default) or "singularity" (experimental) + RuntimeEngine: docker + Logging: # When you run the db:delete_old_container_logs task, it will find # containers that have been finished for at least this many seconds, diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go index 5216f81616..e344d22665 100644 --- a/lib/config/generated_config.go +++ b/lib/config/generated_config.go @@ -868,6 +868,9 @@ Clusters: # Minimum time between two attempts to run the same container MinRetryPeriod: 0s + # Container runtime: "docker" (default) or "singularity" (experimental) + RuntimeEngine: docker + Logging: # When you run the db:delete_old_container_logs task, it will find # containers that have been finished for at least this many seconds, diff --git a/lib/crunchrun/copier.go b/lib/crunchrun/copier.go index 1b0f168b88..78e5f4faa8 100644 --- a/lib/crunchrun/copier.go +++ b/lib/crunchrun/copier.go @@ -55,7 +55,7 @@ type copier struct { keepClient IKeepClient hostOutputDir string ctrOutputDir string - binds []string + bindmounts map[string]bindmount mounts map[string]arvados.Mount secretMounts map[string]arvados.Mount logger printfer @@ -341,11 +341,8 @@ func (cp *copier) hostRoot(ctrRoot string) (string, error) { if ctrRoot == cp.ctrOutputDir { return cp.hostOutputDir, nil } - for _, bind := range cp.binds { - tokens := strings.Split(bind, ":") - if len(tokens) >= 2 && tokens[1] == ctrRoot { - return tokens[0], nil - } + if mnt, ok := cp.bindmounts[ctrRoot]; ok { + return mnt.HostPath, nil } return "", fmt.Errorf("not bind-mounted: %q", ctrRoot) } diff --git a/lib/crunchrun/copier_test.go b/lib/crunchrun/copier_test.go index 777b715d76..9135bcaa1d 100644 --- a/lib/crunchrun/copier_test.go +++ b/lib/crunchrun/copier_test.go @@ -128,7 +128,9 @@ func (s *copierSuite) TestSymlinkToMountedCollection(c *check.C) { PortableDataHash: arvadostest.FooCollectionPDH, Writable: true, } - s.cp.binds = append(s.cp.binds, bindtmp+":/mnt-w") + s.cp.bindmounts = map[string]bindmount{ + "/mnt-w": bindmount{HostPath: bindtmp, ReadOnly: false}, + } c.Assert(os.Symlink("../../mnt", s.cp.hostOutputDir+"/l_dir"), check.IsNil) c.Assert(os.Symlink("/mnt/foo", s.cp.hostOutputDir+"/l_file"), check.IsNil) diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 88c137277c..4bdaca4d64 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -34,11 +34,6 @@ import ( "git.arvados.org/arvados.git/sdk/go/keepclient" "git.arvados.org/arvados.git/sdk/go/manifest" "golang.org/x/net/context" - - dockertypes "github.com/docker/docker/api/types" - dockercontainer "github.com/docker/docker/api/types/container" - dockernetwork "github.com/docker/docker/api/types/network" - dockerclient "github.com/docker/docker/client" ) type command struct{} @@ -74,20 +69,6 @@ type RunArvMount func(args []string, tok string) (*exec.Cmd, error) type MkTempDir func(string, string) (string, error) -// ThinDockerClient is the minimal Docker client interface used by crunch-run. -type ThinDockerClient interface { - ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) - ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, - networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) - ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error - ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error - ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) - ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error) - ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) - ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) - ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) -} - type PsProcess interface { CmdlineSlice() ([]string, error) } @@ -95,7 +76,7 @@ type PsProcess interface { // ContainerRunner is the main stateful struct used for a single execution of a // container. type ContainerRunner struct { - Docker ThinDockerClient + executor containerExecutor // Dispatcher client is initialized with the Dispatcher token. // This is a privileged token used to manage container status @@ -119,35 +100,30 @@ type ContainerRunner struct { ContainerArvClient IArvadosClient ContainerKeepClient IKeepClient - Container arvados.Container - ContainerConfig dockercontainer.Config - HostConfig dockercontainer.HostConfig - token string - ContainerID string - ExitCode *int - NewLogWriter NewLogWriter - loggingDone chan bool - CrunchLog *ThrottledLogger - Stdout io.WriteCloser - Stderr io.WriteCloser - logUUID string - logMtx sync.Mutex - LogCollection arvados.CollectionFileSystem - LogsPDH *string - RunArvMount RunArvMount - MkTempDir MkTempDir - ArvMount *exec.Cmd - ArvMountPoint string - HostOutputDir string - Binds []string - Volumes map[string]struct{} - OutputPDH *string - SigChan chan os.Signal - ArvMountExit chan error - SecretMounts map[string]arvados.Mount - MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) - finalState string - parentTemp string + Container arvados.Container + token string + ExitCode *int + NewLogWriter NewLogWriter + CrunchLog *ThrottledLogger + Stdout io.WriteCloser + Stderr io.WriteCloser + logUUID string + logMtx sync.Mutex + LogCollection arvados.CollectionFileSystem + LogsPDH *string + RunArvMount RunArvMount + MkTempDir MkTempDir + ArvMount *exec.Cmd + ArvMountPoint string + HostOutputDir string + Volumes map[string]struct{} + OutputPDH *string + SigChan chan os.Signal + ArvMountExit chan error + SecretMounts map[string]arvados.Mount + MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) + finalState string + parentTemp string statLogger io.WriteCloser statReporter *crunchstat.Reporter @@ -171,10 +147,9 @@ type ContainerRunner struct { cStateLock sync.Mutex cCancelled bool // StopContainer() invoked - cRemoved bool // docker confirmed the container no longer exists enableNetwork string // one of "default" or "always" - networkMode string // passed through to HostConfig.NetworkMode + networkMode string // "none", "host", or "" -- passed through to executor arvMountLog *ThrottledLogger containerWatchdogInterval time.Duration @@ -182,8 +157,9 @@ type ContainerRunner struct { gateway Gateway } -// setupSignals sets up signal handling to gracefully terminate the underlying -// Docker container and update state when receiving a TERM, INT or QUIT signal. +// setupSignals sets up signal handling to gracefully terminate the +// underlying container and update state when receiving a TERM, INT or +// QUIT signal. func (runner *ContainerRunner) setupSignals() { runner.SigChan = make(chan os.Signal, 1) signal.Notify(runner.SigChan, syscall.SIGTERM) @@ -197,24 +173,18 @@ func (runner *ContainerRunner) setupSignals() { }(runner.SigChan) } -// stop the underlying Docker container. +// stop the underlying container. func (runner *ContainerRunner) stop(sig os.Signal) { runner.cStateLock.Lock() defer runner.cStateLock.Unlock() if sig != nil { runner.CrunchLog.Printf("caught signal: %v", sig) } - if runner.ContainerID == "" { - return - } runner.cCancelled = true - runner.CrunchLog.Printf("removing container") - err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true}) + runner.CrunchLog.Printf("stopping container") + err := runner.executor.Stop() if err != nil { - runner.CrunchLog.Printf("error removing container: %s", err) - } - if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) { - runner.cRemoved = true + runner.CrunchLog.Printf("error stopping container: %s", err) } } @@ -262,57 +232,44 @@ func (runner *ContainerRunner) checkBrokenNode(goterr error) bool { // LoadImage determines the docker image id from the container record and // checks if it is available in the local Docker image store. If not, it loads // the image from Keep. -func (runner *ContainerRunner) LoadImage() (err error) { - +func (runner *ContainerRunner) LoadImage() (string, error) { runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage) - var collection arvados.Collection - err = runner.ContainerArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection) + d, err := os.Open(runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage) + if err != nil { + return "", err + } + defer d.Close() + allfiles, err := d.Readdirnames(-1) if err != nil { - return fmt.Errorf("While getting container image collection: %v", err) + return "", err } - manifest := manifest.Manifest{Text: collection.ManifestText} - var img, imageID string - for ms := range manifest.StreamIter() { - img = ms.FileStreamSegments[0].Name - if !strings.HasSuffix(img, ".tar") { - return fmt.Errorf("First file in the container image collection does not end in .tar") + var tarfiles []string + for _, fnm := range allfiles { + if strings.HasSuffix(fnm, ".tar") { + tarfiles = append(tarfiles, fnm) } - imageID = img[:len(img)-4] } + if len(tarfiles) == 0 { + return "", fmt.Errorf("image collection does not include a .tar image file") + } + if len(tarfiles) > 1 { + return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles) + } + imageID := tarfiles[0][:len(tarfiles[0])-4] + imageFile := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + tarfiles[0] + runner.CrunchLog.Printf("Using Docker image id %q", imageID) - runner.CrunchLog.Printf("Using Docker image id '%s'", imageID) - - _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID) - if err != nil { + if !runner.executor.ImageLoaded(imageID) { runner.CrunchLog.Print("Loading Docker image from keep") - - var readCloser io.ReadCloser - readCloser, err = runner.ContainerKeepClient.ManifestFileReader(manifest, img) + err = runner.executor.LoadImage(imageFile) if err != nil { - return fmt.Errorf("While creating ManifestFileReader for container image: %v", err) + return "", err } - - response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true) - if err != nil { - return fmt.Errorf("While loading container image into Docker: %v", err) - } - - defer response.Body.Close() - rbody, err := ioutil.ReadAll(response.Body) - if err != nil { - return fmt.Errorf("Reading response to image load: %v", err) - } - runner.CrunchLog.Printf("Docker response: %s", rbody) } else { runner.CrunchLog.Print("Docker image is available") } - - runner.ContainerConfig.Image = imageID - - runner.ContainerKeepClient.ClearBlockCache() - - return nil + return imageID, nil } func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) { @@ -418,15 +375,16 @@ func copyfile(src string, dst string) (err error) { return nil } -func (runner *ContainerRunner) SetupMounts() (err error) { - err = runner.SetupArvMountPoint("keep") +func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { + bindmounts := map[string]bindmount{} + err := runner.SetupArvMountPoint("keep") if err != nil { - return fmt.Errorf("While creating keep mount temp dir: %v", err) + return nil, fmt.Errorf("While creating keep mount temp dir: %v", err) } token, err := runner.ContainerToken() if err != nil { - return fmt.Errorf("could not get container token: %s", err) + return nil, fmt.Errorf("could not get container token: %s", err) } pdhOnly := true @@ -442,8 +400,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } collectionPaths := []string{} - runner.Binds = nil - runner.Volumes = make(map[string]struct{}) needCertMount := true type copyFile struct { src string @@ -457,11 +413,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } for bind := range runner.SecretMounts { if _, ok := runner.Container.Mounts[bind]; ok { - return fmt.Errorf("secret mount %q conflicts with regular mount", bind) + return nil, fmt.Errorf("secret mount %q conflicts with regular mount", bind) } if runner.SecretMounts[bind].Kind != "json" && runner.SecretMounts[bind].Kind != "text" { - return fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted", + return nil, fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted", bind, runner.SecretMounts[bind].Kind) } binds = append(binds, bind) @@ -476,7 +432,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if bind == "stdout" || bind == "stderr" { // Is it a "file" mount kind? if mnt.Kind != "file" { - return fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind) + return nil, fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind) } // Does path start with OutputPath? @@ -485,14 +441,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) { prefix += "/" } if !strings.HasPrefix(mnt.Path, prefix) { - return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix) + return nil, fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix) } } if bind == "stdin" { // Is it a "collection" mount kind? if mnt.Kind != "collection" && mnt.Kind != "json" { - return fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind) + return nil, fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind) } } @@ -502,7 +458,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" { if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" { - return fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind) + return nil, fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind) } } @@ -510,17 +466,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) { case mnt.Kind == "collection" && bind != "stdin": var src string if mnt.UUID != "" && mnt.PortableDataHash != "" { - return fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") + return nil, fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") } if mnt.UUID != "" { if mnt.Writable { - return fmt.Errorf("writing to existing collections currently not permitted") + return nil, fmt.Errorf("writing to existing collections currently not permitted") } pdhOnly = false src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID) } else if mnt.PortableDataHash != "" { if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") { - return fmt.Errorf("can never write to a collection specified by portable data hash") + return nil, fmt.Errorf("can never write to a collection specified by portable data hash") } idx := strings.Index(mnt.PortableDataHash, "/") if idx > 0 { @@ -546,14 +502,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if mnt.Writable { if bind == runner.Container.OutputPath { runner.HostOutputDir = src - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind)) + bindmounts[bind] = bindmount{HostPath: src} } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") { copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]}) } else { - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind)) + bindmounts[bind] = bindmount{HostPath: src} } } else { - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind)) + bindmounts[bind] = bindmount{HostPath: src, ReadOnly: true} } collectionPaths = append(collectionPaths, src) @@ -561,17 +517,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) { var tmpdir string tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp") if err != nil { - return fmt.Errorf("while creating mount temp dir: %v", err) + return nil, fmt.Errorf("while creating mount temp dir: %v", err) } st, staterr := os.Stat(tmpdir) if staterr != nil { - return fmt.Errorf("while Stat on temp dir: %v", staterr) + return nil, fmt.Errorf("while Stat on temp dir: %v", staterr) } err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777) if staterr != nil { - return fmt.Errorf("while Chmod temp dir: %v", err) + return nil, fmt.Errorf("while Chmod temp dir: %v", err) } - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind)) + bindmounts[bind] = bindmount{HostPath: tmpdir} if bind == runner.Container.OutputPath { runner.HostOutputDir = tmpdir } @@ -581,53 +537,53 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if mnt.Kind == "json" { filedata, err = json.Marshal(mnt.Content) if err != nil { - return fmt.Errorf("encoding json data: %v", err) + return nil, fmt.Errorf("encoding json data: %v", err) } } else { text, ok := mnt.Content.(string) if !ok { - return fmt.Errorf("content for mount %q must be a string", bind) + return nil, fmt.Errorf("content for mount %q must be a string", bind) } filedata = []byte(text) } tmpdir, err := runner.MkTempDir(runner.parentTemp, mnt.Kind) if err != nil { - return fmt.Errorf("creating temp dir: %v", err) + return nil, fmt.Errorf("creating temp dir: %v", err) } tmpfn := filepath.Join(tmpdir, "mountdata."+mnt.Kind) err = ioutil.WriteFile(tmpfn, filedata, 0444) if err != nil { - return fmt.Errorf("writing temp file: %v", err) + return nil, fmt.Errorf("writing temp file: %v", err) } if strings.HasPrefix(bind, runner.Container.OutputPath+"/") { copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]}) } else { - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind)) + bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true} } case mnt.Kind == "git_tree": tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree") if err != nil { - return fmt.Errorf("creating temp dir: %v", err) + return nil, fmt.Errorf("creating temp dir: %v", err) } err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token) if err != nil { - return err + return nil, err } - runner.Binds = append(runner.Binds, tmpdir+":"+bind+":ro") + bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true} } } if runner.HostOutputDir == "" { - return fmt.Errorf("output path does not correspond to a writable mount point") + return nil, fmt.Errorf("output path does not correspond to a writable mount point") } if needCertMount && runner.Container.RuntimeConstraints.API { for _, certfile := range arvadosclient.CertFiles { _, err := os.Stat(certfile) if err == nil { - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile)) + bindmounts["/etc/arvados/ca-certificates.crt"] = bindmount{HostPath: certfile, ReadOnly: true} break } } @@ -642,20 +598,20 @@ func (runner *ContainerRunner) SetupMounts() (err error) { runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token) if err != nil { - return fmt.Errorf("while trying to start arv-mount: %v", err) + return nil, fmt.Errorf("while trying to start arv-mount: %v", err) } for _, p := range collectionPaths { _, err = os.Stat(p) if err != nil { - return fmt.Errorf("while checking that input files exist: %v", err) + return nil, fmt.Errorf("while checking that input files exist: %v", err) } } for _, cp := range copyFiles { st, err := os.Stat(cp.src) if err != nil { - return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) + return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) } if st.IsDir() { err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error { @@ -686,59 +642,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } } if err != nil { - return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) + return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) } } - return nil -} - -func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) { - // Handle docker log protocol - // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container - defer close(runner.loggingDone) - - header := make([]byte, 8) - var err error - for err == nil { - _, err = io.ReadAtLeast(containerReader, header, 8) - if err != nil { - if err == io.EOF { - err = nil - } - break - } - readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24) - if header[0] == 1 { - // stdout - _, err = io.CopyN(runner.Stdout, containerReader, readsize) - } else { - // stderr - _, err = io.CopyN(runner.Stderr, containerReader, readsize) - } - } - - if err != nil { - runner.CrunchLog.Printf("error reading docker logs: %v", err) - } - - err = runner.Stdout.Close() - if err != nil { - runner.CrunchLog.Printf("error closing stdout logs: %v", err) - } - - err = runner.Stderr.Close() - if err != nil { - runner.CrunchLog.Printf("error closing stderr logs: %v", err) - } - - if runner.statReporter != nil { - runner.statReporter.Stop() - err = runner.statLogger.Close() - if err != nil { - runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) - } - } + return bindmounts, nil } func (runner *ContainerRunner) stopHoststat() error { @@ -775,7 +683,7 @@ func (runner *ContainerRunner) startCrunchstat() error { } runner.statLogger = NewThrottledLogger(w) runner.statReporter = &crunchstat.Reporter{ - CID: runner.ContainerID, + CID: runner.executor.CgroupID(), Logger: log.New(runner.statLogger, "", 0), CgroupParent: runner.expectCgroupParent, CgroupRoot: runner.cgroupRoot, @@ -938,102 +846,6 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str return true, nil } -// AttachStreams connects the docker container stdin, stdout and stderr logs -// to the Arvados logger which logs to Keep and the API server logs table. -func (runner *ContainerRunner) AttachStreams() (err error) { - - runner.CrunchLog.Print("Attaching container streams") - - // If stdin mount is provided, attach it to the docker container - var stdinRdr arvados.File - var stdinJSON []byte - if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok { - if stdinMnt.Kind == "collection" { - var stdinColl arvados.Collection - collID := stdinMnt.UUID - if collID == "" { - collID = stdinMnt.PortableDataHash - } - err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl) - if err != nil { - return fmt.Errorf("While getting stdin collection: %v", err) - } - - stdinRdr, err = runner.ContainerKeepClient.ManifestFileReader( - manifest.Manifest{Text: stdinColl.ManifestText}, - stdinMnt.Path) - if os.IsNotExist(err) { - return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path) - } else if err != nil { - return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err) - } - } else if stdinMnt.Kind == "json" { - stdinJSON, err = json.Marshal(stdinMnt.Content) - if err != nil { - return fmt.Errorf("While encoding stdin json data: %v", err) - } - } - } - - stdinUsed := stdinRdr != nil || len(stdinJSON) != 0 - response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID, - dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true}) - if err != nil { - return fmt.Errorf("While attaching container stdout/stderr streams: %v", err) - } - - runner.loggingDone = make(chan bool) - - if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok { - stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path) - if err != nil { - return err - } - runner.Stdout = stdoutFile - } else if w, err := runner.NewLogWriter("stdout"); err != nil { - return err - } else { - runner.Stdout = NewThrottledLogger(w) - } - - if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok { - stderrFile, err := runner.getStdoutFile(stderrMnt.Path) - if err != nil { - return err - } - runner.Stderr = stderrFile - } else if w, err := runner.NewLogWriter("stderr"); err != nil { - return err - } else { - runner.Stderr = NewThrottledLogger(w) - } - - if stdinRdr != nil { - go func() { - _, err := io.Copy(response.Conn, stdinRdr) - if err != nil { - runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err) - runner.stop(nil) - } - stdinRdr.Close() - response.CloseWrite() - }() - } else if len(stdinJSON) != 0 { - go func() { - _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON)) - if err != nil { - runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err) - runner.stop(nil) - } - response.CloseWrite() - }() - } - - go runner.ProcessDockerAttach(response.Reader) - - return nil -} - func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { stdoutPath := mntPath[len(runner.Container.OutputPath):] index := strings.LastIndex(stdoutPath, "/") @@ -1060,86 +872,107 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { } // CreateContainer creates the docker container. -func (runner *ContainerRunner) CreateContainer() error { - runner.CrunchLog.Print("Creating Docker container") - - runner.ContainerConfig.Cmd = runner.Container.Command - if runner.Container.Cwd != "." { - runner.ContainerConfig.WorkingDir = runner.Container.Cwd +func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error { + var stdin io.ReadCloser + if mnt, ok := runner.Container.Mounts["stdin"]; ok { + switch mnt.Kind { + case "collection": + var collID string + if mnt.UUID != "" { + collID = mnt.UUID + } else { + collID = mnt.PortableDataHash + } + path := runner.ArvMountPoint + "/by_id/" + collID + "/" + mnt.Path + f, err := os.Open(path) + if err != nil { + return err + } + stdin = f + case "json": + j, err := json.Marshal(mnt.Content) + if err != nil { + return fmt.Errorf("error encoding stdin json data: %v", err) + } + stdin = ioutil.NopCloser(bytes.NewReader(j)) + default: + return fmt.Errorf("stdin mount has unsupported kind %q", mnt.Kind) + } } - for k, v := range runner.Container.Environment { - runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v) + var stdout, stderr io.WriteCloser + if mnt, ok := runner.Container.Mounts["stdout"]; ok { + f, err := runner.getStdoutFile(mnt.Path) + if err != nil { + return err + } + stdout = f + } else if w, err := runner.NewLogWriter("stdout"); err != nil { + return err + } else { + stdout = NewThrottledLogger(w) } - runner.ContainerConfig.Volumes = runner.Volumes - - maxRAM := int64(runner.Container.RuntimeConstraints.RAM) - minDockerRAM := int64(16) - if maxRAM < minDockerRAM*1024*1024 { - // Docker daemon won't let you set a limit less than ~10 MiB - maxRAM = minDockerRAM * 1024 * 1024 - } - runner.HostConfig = dockercontainer.HostConfig{ - Binds: runner.Binds, - LogConfig: dockercontainer.LogConfig{ - Type: "none", - }, - Resources: dockercontainer.Resources{ - CgroupParent: runner.setCgroupParent, - NanoCPUs: int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000, - Memory: maxRAM, // RAM - MemorySwap: maxRAM, // RAM+swap - KernelMemory: maxRAM, // kernel portion - }, + if mnt, ok := runner.Container.Mounts["stderr"]; ok { + f, err := runner.getStdoutFile(mnt.Path) + if err != nil { + return err + } + stderr = f + } else if w, err := runner.NewLogWriter("stderr"); err != nil { + return err + } else { + stderr = NewThrottledLogger(w) } + env := runner.Container.Environment + enableNetwork := runner.enableNetwork == "always" if runner.Container.RuntimeConstraints.API { + enableNetwork = true tok, err := runner.ContainerToken() if err != nil { return err } - runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, - "ARVADOS_API_TOKEN="+tok, - "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"), - "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"), - ) - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode) - } else { - if runner.enableNetwork == "always" { - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode) - } else { - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none") - } - } - - _, stdinUsed := runner.Container.Mounts["stdin"] - runner.ContainerConfig.OpenStdin = stdinUsed - runner.ContainerConfig.StdinOnce = stdinUsed - runner.ContainerConfig.AttachStdin = stdinUsed - runner.ContainerConfig.AttachStdout = true - runner.ContainerConfig.AttachStderr = true - - createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID) - if err != nil { - return fmt.Errorf("While creating container: %v", err) - } - - runner.ContainerID = createdBody.ID - - return runner.AttachStreams() + env = map[string]string{} + for k, v := range runner.Container.Environment { + env[k] = v + } + env["ARVADOS_API_TOKEN"] = tok + env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST") + env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE") + } + workdir := runner.Container.Cwd + if workdir == "." { + // both "" and "." mean default + workdir = "" + } + + return runner.executor.Create(containerSpec{ + Image: imageID, + VCPUs: runner.Container.RuntimeConstraints.VCPUs, + RAM: runner.Container.RuntimeConstraints.RAM, + WorkingDir: workdir, + Env: env, + BindMounts: bindmounts, + Command: runner.Container.Command, + EnableNetwork: enableNetwork, + NetworkMode: runner.networkMode, + CgroupParent: runner.setCgroupParent, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + }) } // StartContainer starts the docker container created by CreateContainer. func (runner *ContainerRunner) StartContainer() error { - runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID) + runner.CrunchLog.Printf("Starting container") runner.cStateLock.Lock() defer runner.cStateLock.Unlock() if runner.cCancelled { return ErrCancelled } - err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID, - dockertypes.ContainerStartOptions{}) + err := runner.executor.Start() if err != nil { var advice string if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil { @@ -1153,71 +986,39 @@ func (runner *ContainerRunner) StartContainer() error { // WaitFinish waits for the container to terminate, capture the exit code, and // close the stdout/stderr logging. func (runner *ContainerRunner) WaitFinish() error { - var runTimeExceeded <-chan time.Time runner.CrunchLog.Print("Waiting for container to finish") - - waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning) - arvMountExit := runner.ArvMountExit - if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 { - runTimeExceeded = time.After(time.Duration(timeout) * time.Second) + var timeout <-chan time.Time + if s := runner.Container.SchedulingParameters.MaxRunTime; s > 0 { + timeout = time.After(time.Duration(s) * time.Second) } - - containerGone := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { - defer close(containerGone) - if runner.containerWatchdogInterval < 1 { - runner.containerWatchdogInterval = time.Minute - } - for range time.NewTicker(runner.containerWatchdogInterval).C { - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval)) - ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID) - cancel() - runner.cStateLock.Lock() - done := runner.cRemoved || runner.ExitCode != nil - runner.cStateLock.Unlock() - if done { - return - } else if err != nil { - runner.CrunchLog.Printf("Error inspecting container: %s", err) - runner.checkBrokenNode(err) - return - } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") { - runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State) - return - } - } - }() - - for { select { - case waitBody := <-waitOk: - runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode) - code := int(waitBody.StatusCode) - runner.ExitCode = &code - - // wait for stdout/stderr to complete - <-runner.loggingDone - return nil - - case err := <-waitErr: - return fmt.Errorf("container wait: %v", err) - - case <-arvMountExit: - runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.") - runner.stop(nil) - // arvMountExit will always be ready now that - // it's closed, but that doesn't interest us. - arvMountExit = nil - - case <-runTimeExceeded: + case <-timeout: runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.") runner.stop(nil) - runTimeExceeded = nil + case <-runner.ArvMountExit: + runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.") + runner.stop(nil) + case <-ctx.Done(): + } + }() + exitcode, err := runner.executor.Wait(ctx) + if err != nil { + runner.checkBrokenNode(err) + return err + } + runner.ExitCode = &exitcode - case <-containerGone: - return errors.New("docker client never returned status") + if runner.statReporter != nil { + runner.statReporter.Stop() + err = runner.statLogger.Close() + if err != nil { + runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) } } + return nil } func (runner *ContainerRunner) updateLogs() { @@ -1270,7 +1071,7 @@ func (runner *ContainerRunner) updateLogs() { // CaptureOutput saves data from the container's output directory if // needed, and updates the container output accordingly. -func (runner *ContainerRunner) CaptureOutput() error { +func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error { if runner.Container.RuntimeConstraints.API { // Output may have been set directly by the container, so // refresh the container record to check. @@ -1292,7 +1093,7 @@ func (runner *ContainerRunner) CaptureOutput() error { keepClient: runner.ContainerKeepClient, hostOutputDir: runner.HostOutputDir, ctrOutputDir: runner.Container.OutputPath, - binds: runner.Binds, + bindmounts: bindmounts, mounts: runner.Container.Mounts, secretMounts: runner.SecretMounts, logger: runner.CrunchLog, @@ -1568,6 +1369,7 @@ func (runner *ContainerRunner) Run() (err error) { return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State) } + var bindmounts map[string]bindmount defer func() { // checkErr prints e (unless it's nil) and sets err to // e (unless err is already non-nil). Thus, if err @@ -1602,7 +1404,9 @@ func (runner *ContainerRunner) Run() (err error) { // capture partial output and write logs } - checkErr("CaptureOutput", runner.CaptureOutput()) + if bindmounts != nil { + checkErr("CaptureOutput", runner.CaptureOutput(bindmounts)) + } checkErr("stopHoststat", runner.stopHoststat()) checkErr("CommitLogs", runner.CommitLogs()) checkErr("UpdateContainerFinal", runner.UpdateContainerFinal()) @@ -1614,8 +1418,16 @@ func (runner *ContainerRunner) Run() (err error) { return } + // set up FUSE mount and binds + bindmounts, err = runner.SetupMounts() + if err != nil { + runner.finalState = "Cancelled" + err = fmt.Errorf("While setting up mounts: %v", err) + return + } + // check for and/or load image - err = runner.LoadImage() + imageID, err := runner.LoadImage() if err != nil { if !runner.checkBrokenNode(err) { // Failed to load image but not due to a "broken node" @@ -1626,15 +1438,7 @@ func (runner *ContainerRunner) Run() (err error) { return } - // set up FUSE mount and binds - err = runner.SetupMounts() - if err != nil { - runner.finalState = "Cancelled" - err = fmt.Errorf("While setting up mounts: %v", err) - return - } - - err = runner.CreateContainer() + err = runner.CreateContainer(imageID, bindmounts) if err != nil { return } @@ -1727,14 +1531,12 @@ func (runner *ContainerRunner) fetchContainerRecord() error { func NewContainerRunner(dispatcherClient *arvados.Client, dispatcherArvClient IArvadosClient, dispatcherKeepClient IKeepClient, - docker ThinDockerClient, containerUUID string) (*ContainerRunner, error) { cr := &ContainerRunner{ dispatcherClient: dispatcherClient, DispatcherArvClient: dispatcherArvClient, DispatcherKeepClient: dispatcherKeepClient, - Docker: docker, } cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd @@ -1784,15 +1586,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes") - enableNetwork := flags.String("container-enable-networking", "default", - `Specify if networking should be enabled for container. One of 'default', 'always': - default: only enable networking if container requests it. - always: containers always have networking enabled - `) - networkMode := flags.String("container-network-mode", "default", - `Set networking mode for container. Corresponds to Docker network mode (--net). - `) + enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)") + networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`) memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container") + runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity") flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") ignoreDetachFlag := false @@ -1825,18 +1622,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } } - containerID := flags.Arg(0) + containerUUID := flags.Arg(0) switch { case *detach && !ignoreDetachFlag: - return Detach(containerID, prog, args, os.Stdout, os.Stderr) + return Detach(containerUUID, prog, args, os.Stdout, os.Stderr) case *kill >= 0: - return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr) + return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr) case *list: return ListProcesses(os.Stdout, os.Stderr) } - if containerID == "" { + if containerUUID == "" { log.Printf("usage: %s [options] UUID", prog) return 1 } @@ -1850,45 +1647,60 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s api, err := arvadosclient.MakeArvadosClient() if err != nil { - log.Printf("%s: %v", containerID, err) + log.Printf("%s: %v", containerUUID, err) return 1 } api.Retries = 8 kc, kcerr := keepclient.MakeKeepClient(api) if kcerr != nil { - log.Printf("%s: %v", containerID, kcerr) + log.Printf("%s: %v", containerUUID, kcerr) return 1 } kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} kc.Retries = 4 - // API version 1.21 corresponds to Docker 1.9, which is currently the - // minimum version we want to support. - docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) - - cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID) + cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID) if err != nil { log.Print(err) return 1 } - if dockererr != nil { - cr.CrunchLog.Printf("%s: %v", containerID, dockererr) - cr.checkBrokenNode(dockererr) + + switch *runtimeEngine { + case "docker": + cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval) + case "singularity": + cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf) + default: + cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine) cr.CrunchLog.Close() return 1 } - - cr.gateway = Gateway{ - Address: os.Getenv("GatewayAddress"), - AuthSecret: os.Getenv("GatewayAuthSecret"), - ContainerUUID: containerID, - DockerContainerID: &cr.ContainerID, - Log: cr.CrunchLog, - ContainerIPAddress: dockerContainerIPAddress(&cr.ContainerID), + if err != nil { + cr.CrunchLog.Printf("%s: %v", containerUUID, err) + cr.checkBrokenNode(err) + cr.CrunchLog.Close() + return 1 } + defer cr.executor.Close() + + gwAuthSecret := os.Getenv("GatewayAuthSecret") os.Unsetenv("GatewayAuthSecret") - if cr.gateway.Address != "" { + if gwAuthSecret == "" { + // not safe to run a gateway service without an auth + // secret + } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" { + // dispatcher did not tell us which external IP + // address to advertise --> no gateway service + } else if de, ok := cr.executor.(*dockerExecutor); ok { + cr.gateway = Gateway{ + Address: gwListen, + AuthSecret: gwAuthSecret, + ContainerUUID: containerUUID, + DockerContainerID: &de.containerID, + Log: cr.CrunchLog, + ContainerIPAddress: dockerContainerIPAddress(&de.containerID), + } err = cr.gateway.Start() if err != nil { log.Printf("error starting gateway server: %s", err) @@ -1896,9 +1708,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } } - parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".") + parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerUUID+".") if tmperr != nil { - log.Printf("%s: %v", containerID, tmperr) + log.Printf("%s: %v", containerUUID, tmperr) return 1 } @@ -1932,7 +1744,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } if runerr != nil { - log.Printf("%s: %v", containerID, runerr) + log.Printf("%s: %v", containerUUID, runerr) return 1 } return 0 diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go index dbdaa6293d..249d7f1bde 100644 --- a/lib/crunchrun/crunchrun_test.go +++ b/lib/crunchrun/crunchrun_test.go @@ -5,7 +5,6 @@ package crunchrun import ( - "bufio" "bytes" "crypto/md5" "encoding/json" @@ -13,11 +12,10 @@ import ( "fmt" "io" "io/ioutil" - "net" "os" "os/exec" + "regexp" "runtime/pprof" - "sort" "strings" "sync" "syscall" @@ -30,9 +28,6 @@ import ( "git.arvados.org/arvados.git/sdk/go/manifest" "golang.org/x/net/context" - dockertypes "github.com/docker/docker/api/types" - dockercontainer "github.com/docker/docker/api/types/container" - dockernetwork "github.com/docker/docker/api/types/network" . "gopkg.in/check.v1" ) @@ -41,18 +36,43 @@ func TestCrunchExec(t *testing.T) { TestingT(t) } -// Gocheck boilerplate var _ = Suite(&TestSuite{}) type TestSuite struct { - client *arvados.Client - docker *TestDockerClient - runner *ContainerRunner + client *arvados.Client + api *ArvTestClient + runner *ContainerRunner + executor *stubExecutor + keepmount string } func (s *TestSuite) SetUpTest(c *C) { + *brokenNodeHook = "" s.client = arvados.NewClientFromEnv() - s.docker = NewTestDockerClient() + s.executor = &stubExecutor{} + var err error + s.api = &ArvTestClient{} + s.runner, err = NewContainerRunner(s.client, s.api, &KeepTestClient{}, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + c.Assert(err, IsNil) + s.runner.executor = s.executor + s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { + return s.api, &KeepTestClient{}, s.client, nil + } + s.runner.RunArvMount = func(cmd []string, tok string) (*exec.Cmd, error) { + s.runner.ArvMountPoint = s.keepmount + return nil, nil + } + s.keepmount = c.MkDir() + err = os.Mkdir(s.keepmount+"/by_id", 0755) + c.Assert(err, IsNil) + err = os.Mkdir(s.keepmount+"/by_id/"+arvadostest.DockerImage112PDH, 0755) + c.Assert(err, IsNil) + err = ioutil.WriteFile(s.keepmount+"/by_id/"+arvadostest.DockerImage112PDH+"/"+arvadostest.DockerImage112Filename, []byte("#notarealtarball"), 0644) + err = os.Mkdir(s.keepmount+"/by_id/"+fakeInputCollectionPDH, 0755) + c.Assert(err, IsNil) + err = ioutil.WriteFile(s.keepmount+"/by_id/"+fakeInputCollectionPDH+"/input.json", []byte(`{"input":true}`), 0644) + c.Assert(err, IsNil) + s.runner.ArvMountPoint = s.keepmount } type ArvTestClient struct { @@ -72,6 +92,38 @@ type KeepTestClient struct { Content []byte } +type stubExecutor struct { + imageLoaded bool + loaded string + loadErr error + exitCode int + createErr error + created containerSpec + startErr error + waitSleep time.Duration + waitErr error + stopErr error + stopped bool + closed bool + runFunc func() + exit chan int +} + +func (e *stubExecutor) ImageLoaded(imageID string) bool { return e.imageLoaded } +func (e *stubExecutor) LoadImage(filename string) error { e.loaded = filename; return e.loadErr } +func (e *stubExecutor) Create(spec containerSpec) error { e.created = spec; return e.createErr } +func (e *stubExecutor) Start() error { e.exit = make(chan int, 1); go e.runFunc(); return e.startErr } +func (e *stubExecutor) CgroupID() string { return "cgroupid" } +func (e *stubExecutor) Stop() error { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr } +func (e *stubExecutor) Close() { e.closed = true } +func (e *stubExecutor) Wait(context.Context) (int, error) { + defer e.created.Stdout.Close() + defer e.created.Stderr.Close() + return <-e.exit, e.waitErr +} + +const fakeInputCollectionPDH = "ffffffffaaaaaaaa88888888eeeeeeee+1234" + var hwManifest = ". 82ab40c24fc8df01798e57ba66795bb1+841216+Aa124ac75e5168396c73c0a18eda641a4f41791c0@569fa8c3 0:841216:9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7.tar\n" var hwPDH = "a45557269dcb65a6b78f9ac061c0850b+120" var hwImageID = "9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7" @@ -92,129 +144,6 @@ var denormalizedWithSubdirsPDH = "b0def87f80dd594d4675809e83bd4f15+367" var fakeAuthUUID = "zzzzz-gj3su-55pqoyepgi2glem" var fakeAuthToken = "a3ltuwzqcu2u4sc0q7yhpc2w7s00fdcqecg5d6e0u3pfohmbjt" -type TestDockerClient struct { - imageLoaded string - logReader io.ReadCloser - logWriter io.WriteCloser - fn func(t *TestDockerClient) - exitCode int - stop chan bool - cwd string - env []string - api *ArvTestClient - realTemp string - calledWait bool - ctrExited bool -} - -func NewTestDockerClient() *TestDockerClient { - t := &TestDockerClient{} - t.logReader, t.logWriter = io.Pipe() - t.stop = make(chan bool, 1) - t.cwd = "/" - return t -} - -type MockConn struct { - net.Conn -} - -func (m *MockConn) Write(b []byte) (int, error) { - return len(b), nil -} - -func NewMockConn() *MockConn { - c := &MockConn{} - return c -} - -func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) { - return dockertypes.HijackedResponse{Conn: NewMockConn(), Reader: bufio.NewReader(t.logReader)}, nil -} - -func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) { - if config.WorkingDir != "" { - t.cwd = config.WorkingDir - } - t.env = config.Env - return dockercontainer.ContainerCreateCreatedBody{ID: "abcde"}, nil -} - -func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error { - if t.exitCode == 3 { - return errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`) - } - if t.exitCode == 4 { - return errors.New(`panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`) - } - if t.exitCode == 5 { - return errors.New(`Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`) - } - if t.exitCode == 6 { - return errors.New(`Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`) - } - - if container == "abcde" { - // t.fn gets executed in ContainerWait - return nil - } - return errors.New("Invalid container id") -} - -func (t *TestDockerClient) ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error { - t.stop <- true - return nil -} - -func (t *TestDockerClient) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) { - t.calledWait = true - body := make(chan dockercontainer.ContainerWaitOKBody, 1) - err := make(chan error) - go func() { - t.fn(t) - body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.exitCode)} - }() - return body, err -} - -func (t *TestDockerClient) ContainerInspect(ctx context.Context, id string) (c dockertypes.ContainerJSON, err error) { - c.ContainerJSONBase = &dockertypes.ContainerJSONBase{} - c.ID = "abcde" - if t.ctrExited { - c.State = &dockertypes.ContainerState{Status: "exited", Dead: true} - } else { - c.State = &dockertypes.ContainerState{Status: "running", Pid: 1234, Running: true} - } - return -} - -func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) { - if t.exitCode == 2 { - return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?") - } - - if t.imageLoaded == image { - return dockertypes.ImageInspect{}, nil, nil - } - return dockertypes.ImageInspect{}, nil, errors.New("") -} - -func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) { - if t.exitCode == 2 { - return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?") - } - _, err := io.Copy(ioutil.Discard, input) - if err != nil { - return dockertypes.ImageLoadResponse{}, err - } - t.imageLoaded = hwImageID - return dockertypes.ImageLoadResponse{Body: ioutil.NopCloser(input)}, nil -} - -func (*TestDockerClient) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) { - return nil, nil -} - func (client *ArvTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error { @@ -287,7 +216,7 @@ func (client *ArvTestClient) CallRaw(method, resourceType, uuid, action string, } else { j = []byte(`{ "command": ["sleep", "1"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "` + arvadostest.DockerImage112PDH + `", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"}, "/json": {"kind": "json", "content": {"number": 123456789123456789}}}, @@ -438,49 +367,45 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s } func (s *TestSuite) TestLoadImage(c *C) { - cr, err := NewContainerRunner(s.client, &ArvTestClient{}, - &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - c.Assert(err, IsNil) - - kc := &KeepTestClient{} - defer kc.Close() - cr.ContainerArvClient = &ArvTestClient{} - cr.ContainerKeepClient = kc - - _, err = cr.Docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{}) - c.Check(err, IsNil) - - _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageID) - c.Check(err, NotNil) - - cr.Container.ContainerImage = hwPDH - - // (1) Test loading image from keep - c.Check(kc.Called, Equals, false) - c.Check(cr.ContainerConfig.Image, Equals, "") - - err = cr.LoadImage() - - c.Check(err, IsNil) - defer func() { - cr.Docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{}) - }() + s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH + s.runner.Container.Mounts = map[string]arvados.Mount{ + "/out": {Kind: "tmp", Writable: true}, + } + s.runner.Container.OutputPath = "/out" - c.Check(kc.Called, Equals, true) - c.Check(cr.ContainerConfig.Image, Equals, hwImageID) + _, err := s.runner.SetupMounts() + c.Assert(err, IsNil) - _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageID) + imageID, err := s.runner.LoadImage() c.Check(err, IsNil) - - // (2) Test using image that's already loaded - kc.Called = false - cr.ContainerConfig.Image = "" - - err = cr.LoadImage() + c.Check(s.executor.loaded, Matches, ".*"+regexp.QuoteMeta(arvadostest.DockerImage112Filename)) + c.Check(imageID, Equals, strings.TrimSuffix(arvadostest.DockerImage112Filename, ".tar")) + + s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH + s.executor.imageLoaded = false + s.executor.loaded = "" + s.executor.loadErr = errors.New("bork") + imageID, err = s.runner.LoadImage() + c.Check(err, ErrorMatches, ".*bork") + c.Check(s.executor.loaded, Matches, ".*"+regexp.QuoteMeta(arvadostest.DockerImage112Filename)) + + s.runner.Container.ContainerImage = fakeInputCollectionPDH + s.executor.imageLoaded = false + s.executor.loaded = "" + s.executor.loadErr = nil + imageID, err = s.runner.LoadImage() + c.Check(err, ErrorMatches, "image collection does not include a \\.tar image file") + c.Check(s.executor.loaded, Equals, "") + + // if executor reports image is already loaded, LoadImage should not be called + s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH + s.executor.imageLoaded = true + s.executor.loaded = "" + s.executor.loadErr = nil + imageID, err = s.runner.LoadImage() c.Check(err, IsNil) - c.Check(kc.Called, Equals, false) - c.Check(cr.ContainerConfig.Image, Equals, hwImageID) - + c.Check(s.executor.loaded, Equals, "") + c.Check(imageID, Equals, strings.TrimSuffix(arvadostest.DockerImage112Filename, ".tar")) } type ArvErrorTestClient struct{} @@ -555,65 +480,6 @@ func (KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename return ErrorReader{}, nil } -func (s *TestSuite) TestLoadImageArvError(c *C) { - // (1) Arvados error - kc := &KeepTestClient{} - defer kc.Close() - cr, err := NewContainerRunner(s.client, &ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - c.Assert(err, IsNil) - - cr.ContainerArvClient = &ArvErrorTestClient{} - cr.ContainerKeepClient = &KeepTestClient{} - - cr.Container.ContainerImage = hwPDH - - err = cr.LoadImage() - c.Check(err.Error(), Equals, "While getting container image collection: ArvError") -} - -func (s *TestSuite) TestLoadImageKeepError(c *C) { - // (2) Keep error - kc := &KeepErrorTestClient{} - cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - c.Assert(err, IsNil) - - cr.ContainerArvClient = &ArvTestClient{} - cr.ContainerKeepClient = &KeepErrorTestClient{} - - cr.Container.ContainerImage = hwPDH - - err = cr.LoadImage() - c.Assert(err, NotNil) - c.Check(err.Error(), Equals, "While creating ManifestFileReader for container image: KeepError") -} - -func (s *TestSuite) TestLoadImageCollectionError(c *C) { - // (3) Collection doesn't contain image - kc := &KeepReadErrorTestClient{} - cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - c.Assert(err, IsNil) - cr.Container.ContainerImage = otherPDH - - cr.ContainerArvClient = &ArvTestClient{} - cr.ContainerKeepClient = &KeepReadErrorTestClient{} - - err = cr.LoadImage() - c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar") -} - -func (s *TestSuite) TestLoadImageKeepReadError(c *C) { - // (4) Collection doesn't contain image - kc := &KeepReadErrorTestClient{} - cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - c.Assert(err, IsNil) - cr.Container.ContainerImage = hwPDH - cr.ContainerArvClient = &ArvTestClient{} - cr.ContainerKeepClient = &KeepReadErrorTestClient{} - - err = cr.LoadImage() - c.Check(err, NotNil) -} - type ClosableBuffer struct { bytes.Buffer } @@ -647,35 +513,31 @@ func dockerLog(fd byte, msg string) []byte { } func (s *TestSuite) TestRunContainer(c *C) { - s.docker.fn = func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, "Hello world\n")) - t.logWriter.Close() + s.executor.runFunc = func() { + fmt.Fprintf(s.executor.created.Stdout, "Hello world\n") + s.executor.created.Stdout.Close() + s.executor.created.Stderr.Close() + s.executor.exit <- 0 } - kc := &KeepTestClient{} - defer kc.Close() - cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - c.Assert(err, IsNil) - - cr.ContainerArvClient = &ArvTestClient{} - cr.ContainerKeepClient = &KeepTestClient{} var logs TestLogs - cr.NewLogWriter = logs.NewTestLoggingWriter - cr.Container.ContainerImage = hwPDH - cr.Container.Command = []string{"./hw"} - err = cr.LoadImage() - c.Check(err, IsNil) + s.runner.NewLogWriter = logs.NewTestLoggingWriter + s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH + s.runner.Container.Command = []string{"./hw"} - err = cr.CreateContainer() - c.Check(err, IsNil) + imageID, err := s.runner.LoadImage() + c.Assert(err, IsNil) - err = cr.StartContainer() - c.Check(err, IsNil) + err = s.runner.CreateContainer(imageID, nil) + c.Assert(err, IsNil) - err = cr.WaitFinish() - c.Check(err, IsNil) + err = s.runner.StartContainer() + c.Assert(err, IsNil) + + err = s.runner.WaitFinish() + c.Assert(err, IsNil) - c.Check(strings.HasSuffix(logs.Stdout.String(), "Hello world\n"), Equals, true) + c.Check(logs.Stdout.String(), Matches, ".*Hello world\n") c.Check(logs.Stderr.String(), Equals, "") } @@ -683,7 +545,7 @@ func (s *TestSuite) TestCommitLogs(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp @@ -705,7 +567,7 @@ func (s *TestSuite) TestUpdateContainerRunning(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) err = cr.UpdateContainerRunning() @@ -718,7 +580,7 @@ func (s *TestSuite) TestUpdateContainerComplete(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) cr.LogsPDH = new(string) @@ -740,7 +602,7 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) cr.cCancelled = true cr.finalState = "Cancelled" @@ -755,10 +617,10 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) { // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full // dress rehearsal of the Run() function, starting from a JSON container record. -func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) { - rec := arvados.Container{} - err := json.Unmarshal([]byte(record), &rec) - c.Check(err, IsNil) +func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func()) (*ArvTestClient, *ContainerRunner, string) { + err := json.Unmarshal([]byte(record), &s.api.Container) + c.Assert(err, IsNil) + initialState := s.api.Container.State var sm struct { SecretMounts map[string]arvados.Mount `json:"secret_mounts"` @@ -766,33 +628,22 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi err = json.Unmarshal([]byte(record), &sm) c.Check(err, IsNil) secretMounts, err := json.Marshal(sm) - c.Logf("%s %q", sm, secretMounts) - c.Check(err, IsNil) - - s.docker.exitCode = exitCode - s.docker.fn = fn - s.docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{}) - - api = &ArvTestClient{Container: rec} - s.docker.api = api - kc := &KeepTestClient{} - defer kc.Close() - cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) - s.runner = cr - cr.statInterval = 100 * time.Millisecond - cr.containerWatchdogInterval = time.Second - am := &ArvMountCmdLine{} - cr.RunArvMount = am.ArvMountTest + c.Logf("SecretMounts decoded %v json %q", sm, secretMounts) - realTemp, err = ioutil.TempDir("", "crunchrun_test1-") - c.Assert(err, IsNil) - defer os.RemoveAll(realTemp) + s.executor.runFunc = func() { + fn() + s.executor.exit <- exitCode + } - s.docker.realTemp = realTemp + s.runner.statInterval = 100 * time.Millisecond + s.runner.containerWatchdogInterval = time.Second + am := &ArvMountCmdLine{} + s.runner.RunArvMount = am.ArvMountTest + realTemp := c.MkDir() tempcount := 0 - cr.MkTempDir = func(_ string, prefix string) (string, error) { + s.runner.MkTempDir = func(_, prefix string) (string, error) { tempcount++ d := fmt.Sprintf("%s/%s%d", realTemp, prefix, tempcount) err := os.Mkdir(d, os.ModePerm) @@ -802,73 +653,80 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi } return d, err } - cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { + s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { return &ArvTestClient{secretMounts: secretMounts}, &KeepTestClient{}, nil, nil } if extraMounts != nil && len(extraMounts) > 0 { - err := cr.SetupArvMountPoint("keep") + err := s.runner.SetupArvMountPoint("keep") c.Check(err, IsNil) for _, m := range extraMounts { - os.MkdirAll(cr.ArvMountPoint+"/by_id/"+m, os.ModePerm) + os.MkdirAll(s.runner.ArvMountPoint+"/by_id/"+m, os.ModePerm) } } - err = cr.Run() - if api.CalledWith("container.state", "Complete") != nil { + err = s.runner.Run() + if s.api.CalledWith("container.state", "Complete") != nil { c.Check(err, IsNil) } - if exitCode != 2 { - c.Check(api.WasSetRunning, Equals, true) + if s.executor.loadErr == nil && s.executor.createErr == nil && initialState != "Running" { + c.Check(s.api.WasSetRunning, Equals, true) var lastupdate arvadosclient.Dict - for _, content := range api.Content { + for _, content := range s.api.Content { if content["container"] != nil { lastupdate = content["container"].(arvadosclient.Dict) } } if lastupdate["log"] == nil { - c.Errorf("no container update with non-nil log -- updates were: %v", api.Content) + c.Errorf("no container update with non-nil log -- updates were: %v", s.api.Content) } } if err != nil { - for k, v := range api.Logs { + for k, v := range s.api.Logs { c.Log(k) c.Log(v.String()) } } - return + return s.api, s.runner, realTemp } func (s *TestSuite) TestFullRunHello(c *C) { - api, _, _ := s.fullRunHelper(c, `{ + s.runner.networkMode = "default" + s.fullRunHelper(c, `{ "command": ["echo", "hello world"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", - "environment": {}, + "environment": {"foo":"bar","baz":"waz"}, "mounts": {"/tmp": {"kind": "tmp"} }, "output_path": "/tmp", "priority": 1, - "runtime_constraints": {}, + "runtime_constraints": {"vcpus":1,"ram":1000000}, "state": "Locked" -}`, nil, 0, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, "hello world\n")) - t.logWriter.Close() +}`, nil, 0, func() { + c.Check(s.executor.created.Command, DeepEquals, []string{"echo", "hello world"}) + c.Check(s.executor.created.Image, Equals, "sha256:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678") + c.Check(s.executor.created.Env, DeepEquals, map[string]string{"foo": "bar", "baz": "waz"}) + c.Check(s.executor.created.VCPUs, Equals, 1) + c.Check(s.executor.created.RAM, Equals, int64(1000000)) + c.Check(s.executor.created.NetworkMode, Equals, "default") + c.Check(s.executor.created.EnableNetwork, Equals, false) + fmt.Fprintln(s.executor.created.Stdout, "hello world") }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) - c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) + c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello world\n") } func (s *TestSuite) TestRunAlreadyRunning(c *C) { var ran bool - api, _, _ := s.fullRunHelper(c, `{ + s.fullRunHelper(c, `{ "command": ["sleep", "3"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -877,19 +735,18 @@ func (s *TestSuite) TestRunAlreadyRunning(c *C) { "runtime_constraints": {}, "scheduling_parameters":{"max_run_time": 1}, "state": "Running" -}`, nil, 2, func(t *TestDockerClient) { +}`, nil, 2, func() { ran = true }) - - c.Check(api.CalledWith("container.state", "Cancelled"), IsNil) - c.Check(api.CalledWith("container.state", "Complete"), IsNil) + c.Check(s.api.CalledWith("container.state", "Cancelled"), IsNil) + c.Check(s.api.CalledWith("container.state", "Complete"), IsNil) c.Check(ran, Equals, false) } func (s *TestSuite) TestRunTimeExceeded(c *C) { - api, _, _ := s.fullRunHelper(c, `{ + s.fullRunHelper(c, `{ "command": ["sleep", "3"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -898,38 +755,35 @@ func (s *TestSuite) TestRunTimeExceeded(c *C) { "runtime_constraints": {}, "scheduling_parameters":{"max_run_time": 1}, "state": "Locked" -}`, nil, 0, func(t *TestDockerClient) { +}`, nil, 0, func() { time.Sleep(3 * time.Second) - t.logWriter.Close() }) - c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) - c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*") + c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) + c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*") } func (s *TestSuite) TestContainerWaitFails(c *C) { - api, _, _ := s.fullRunHelper(c, `{ + s.fullRunHelper(c, `{ "command": ["sleep", "3"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", "mounts": {"/tmp": {"kind": "tmp"} }, "output_path": "/tmp", "priority": 1, "state": "Locked" -}`, nil, 0, func(t *TestDockerClient) { - t.ctrExited = true - time.Sleep(10 * time.Second) - t.logWriter.Close() +}`, nil, 0, func() { + s.executor.waitErr = errors.New("Container is not running") }) - c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) - c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*") + c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) + c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*") } func (s *TestSuite) TestCrunchstat(c *C) { - api, _, _ := s.fullRunHelper(c, `{ + s.fullRunHelper(c, `{ "command": ["sleep", "1"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -937,33 +791,32 @@ func (s *TestSuite) TestCrunchstat(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" - }`, nil, 0, func(t *TestDockerClient) { + }`, nil, 0, func() { time.Sleep(time.Second) - t.logWriter.Close() }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) // We didn't actually start a container, so crunchstat didn't // find accounting files and therefore didn't log any stats. // It should have logged a "can't find accounting files" // message after one poll interval, though, so we can confirm // it's alive: - c.Assert(api.Logs["crunchstat"], NotNil) - c.Check(api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files have not appeared after 100ms.*`) + c.Assert(s.api.Logs["crunchstat"], NotNil) + c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files have not appeared after 100ms.*`) // The "files never appeared" log assures us that we called // (*crunchstat.Reporter)Stop(), and that we set it up with // the correct container ID "abcde": - c.Check(api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files never appeared for abcde\n`) + c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files never appeared for cgroupid\n`) } func (s *TestSuite) TestNodeInfoLog(c *C) { os.Setenv("SLURMD_NODENAME", "compute2") - api, _, _ := s.fullRunHelper(c, `{ + s.fullRunHelper(c, `{ "command": ["sleep", "1"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -972,22 +825,21 @@ func (s *TestSuite) TestNodeInfoLog(c *C) { "runtime_constraints": {}, "state": "Locked" }`, nil, 0, - func(t *TestDockerClient) { + func() { time.Sleep(time.Second) - t.logWriter.Close() }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - c.Assert(api.Logs["node"], NotNil) - json := api.Logs["node"].String() + c.Assert(s.api.Logs["node"], NotNil) + json := s.api.Logs["node"].String() c.Check(json, Matches, `(?ms).*"uuid": *"zzzzz-7ekkf-2z3mc76g2q73aio".*`) c.Check(json, Matches, `(?ms).*"total_cpu_cores": *16.*`) c.Check(json, Not(Matches), `(?ms).*"info":.*`) - c.Assert(api.Logs["node-info"], NotNil) - json = api.Logs["node-info"].String() + c.Assert(s.api.Logs["node-info"], NotNil) + json = s.api.Logs["node-info"].String() c.Check(json, Matches, `(?ms).*Host Information.*`) c.Check(json, Matches, `(?ms).*CPU Information.*`) c.Check(json, Matches, `(?ms).*Memory Information.*`) @@ -996,9 +848,9 @@ func (s *TestSuite) TestNodeInfoLog(c *C) { } func (s *TestSuite) TestContainerRecordLog(c *C) { - api, _, _ := s.fullRunHelper(c, `{ + s.fullRunHelper(c, `{ "command": ["sleep", "1"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -1007,22 +859,21 @@ func (s *TestSuite) TestContainerRecordLog(c *C) { "runtime_constraints": {}, "state": "Locked" }`, nil, 0, - func(t *TestDockerClient) { + func() { time.Sleep(time.Second) - t.logWriter.Close() }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - c.Assert(api.Logs["container"], NotNil) - c.Check(api.Logs["container"].String(), Matches, `(?ms).*container_image.*`) + c.Assert(s.api.Logs["container"], NotNil) + c.Check(s.api.Logs["container"].String(), Matches, `(?ms).*container_image.*`) } func (s *TestSuite) TestFullRunStderr(c *C) { - api, _, _ := s.fullRunHelper(c, `{ + s.fullRunHelper(c, `{ "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -1030,25 +881,24 @@ func (s *TestSuite) TestFullRunStderr(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 1, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, "hello\n")) - t.logWriter.Write(dockerLog(2, "world\n")) - t.logWriter.Close() +}`, nil, 1, func() { + fmt.Fprintln(s.executor.created.Stdout, "hello") + fmt.Fprintln(s.executor.created.Stderr, "world") }) - final := api.CalledWith("container.state", "Complete") + final := s.api.CalledWith("container.state", "Complete") c.Assert(final, NotNil) c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1) c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil) - c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true) - c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true) + c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello\n") + c.Check(s.api.Logs["stderr"].String(), Matches, ".*world\n") } func (s *TestSuite) TestFullRunDefaultCwd(c *C) { - api, _, _ := s.fullRunHelper(c, `{ + s.fullRunHelper(c, `{ "command": ["pwd"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -1056,21 +906,20 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 0, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, t.cwd+"\n")) - t.logWriter.Close() +}`, nil, 0, func() { + fmt.Fprintf(s.executor.created.Stdout, "workdir=%q", s.executor.created.WorkingDir) }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) - c.Log(api.Logs["stdout"]) - c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) + c.Log(s.api.Logs["stdout"]) + c.Check(s.api.Logs["stdout"].String(), Matches, `.*workdir=""\n`) } func (s *TestSuite) TestFullRunSetCwd(c *C) { - api, _, _ := s.fullRunHelper(c, `{ + s.fullRunHelper(c, `{ "command": ["pwd"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": "/bin", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -1078,41 +927,37 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 0, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, t.cwd+"\n")) - t.logWriter.Close() +}`, nil, 0, func() { + fmt.Fprintln(s.executor.created.Stdout, s.executor.created.WorkingDir) }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) - c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) + c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n") } func (s *TestSuite) TestStopOnSignal(c *C) { - s.testStopContainer(c, func(cr *ContainerRunner) { - go func() { - for !s.docker.calledWait { - time.Sleep(time.Millisecond) - } - cr.SigChan <- syscall.SIGINT - }() - }) + s.executor.runFunc = func() { + s.executor.created.Stdout.Write([]byte("foo\n")) + s.runner.SigChan <- syscall.SIGINT + } + s.testStopContainer(c) } func (s *TestSuite) TestStopOnArvMountDeath(c *C) { - s.testStopContainer(c, func(cr *ContainerRunner) { - cr.ArvMountExit = make(chan error) - go func() { - cr.ArvMountExit <- exec.Command("true").Run() - close(cr.ArvMountExit) - }() - }) + s.executor.runFunc = func() { + s.executor.created.Stdout.Write([]byte("foo\n")) + s.runner.ArvMountExit <- nil + close(s.runner.ArvMountExit) + } + s.runner.ArvMountExit = make(chan error) + s.testStopContainer(c) } -func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) { +func (s *TestSuite) testStopContainer(c *C) { record := `{ "command": ["/bin/sh", "-c", "echo foo && sleep 30 && echo bar"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "` + arvadostest.DockerImage112PDH + `", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -1122,31 +967,17 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) { "state": "Locked" }` - rec := arvados.Container{} - err := json.Unmarshal([]byte(record), &rec) - c.Check(err, IsNil) - - s.docker.fn = func(t *TestDockerClient) { - <-t.stop - t.logWriter.Write(dockerLog(1, "foo\n")) - t.logWriter.Close() - } - s.docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{}) - - api := &ArvTestClient{Container: rec} - kc := &KeepTestClient{} - defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + err := json.Unmarshal([]byte(record), &s.api.Container) c.Assert(err, IsNil) - cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil } - cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { + + s.runner.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil } + s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { return &ArvTestClient{}, &KeepTestClient{}, nil, nil } - setup(cr) done := make(chan error) go func() { - done <- cr.Run() + done <- s.runner.Run() }() select { case <-time.After(20 * time.Second): @@ -1155,20 +986,20 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) { case err = <-done: c.Check(err, IsNil) } - for k, v := range api.Logs { + for k, v := range s.api.Logs { c.Log(k) - c.Log(v.String()) + c.Log(v.String(), "\n") } - c.Check(api.CalledWith("container.log", nil), NotNil) - c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) - c.Check(api.Logs["stdout"].String(), Matches, "(?ms).*foo\n$") + c.Check(s.api.CalledWith("container.log", nil), NotNil) + c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) + c.Check(s.api.Logs["stdout"].String(), Matches, "(?ms).*foo\n$") } func (s *TestSuite) TestFullRunSetEnv(c *C) { - api, _, _ := s.fullRunHelper(c, `{ + s.fullRunHelper(c, `{ "command": ["/bin/sh", "-c", "echo $FROBIZ"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": "/bin", "environment": {"FROBIZ": "bilbo"}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -1176,14 +1007,13 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 0, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) - t.logWriter.Close() +}`, nil, 0, func() { + fmt.Fprintf(s.executor.created.Stdout, "%v", s.executor.created.Env) }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) - c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) + c.Check(s.api.Logs["stdout"].String(), Matches, `.*map\[FROBIZ:bilbo\]\n`) } type ArvMountCmdLine struct { @@ -1206,27 +1036,17 @@ func stubCert(temp string) string { } func (s *TestSuite) TestSetupMounts(c *C) { - api := &ArvTestClient{} - kc := &KeepTestClient{} - defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") - c.Assert(err, IsNil) + cr := s.runner am := &ArvMountCmdLine{} cr.RunArvMount = am.ArvMountTest cr.ContainerArvClient = &ArvTestClient{} cr.ContainerKeepClient = &KeepTestClient{} - realTemp, err := ioutil.TempDir("", "crunchrun_test1-") - c.Assert(err, IsNil) - certTemp, err := ioutil.TempDir("", "crunchrun_test2-") - c.Assert(err, IsNil) + realTemp := c.MkDir() + certTemp := c.MkDir() stubCertPath := stubCert(certTemp) - cr.parentTemp = realTemp - defer os.RemoveAll(realTemp) - defer os.RemoveAll(certTemp) - i := 0 cr.MkTempDir = func(_ string, prefix string) (string, error) { i++ @@ -1255,12 +1075,12 @@ func (s *TestSuite) TestSetupMounts(c *C) { cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"} cr.Container.OutputPath = "/tmp" cr.statInterval = 5 * time.Second - err := cr.SetupMounts() + bindmounts, err := cr.SetupMounts() c.Check(err, IsNil) c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--crunchstat-interval=5", "--mount-by-pdh", "by_id", realTemp + "/keep1"}) - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp"}) + c.Check(bindmounts, DeepEquals, map[string]bindmount{"/tmp": {realTemp + "/tmp2", false}}) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() @@ -1274,12 +1094,12 @@ func (s *TestSuite) TestSetupMounts(c *C) { cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"} cr.Container.OutputPath = "/out" - err := cr.SetupMounts() + bindmounts, err := cr.SetupMounts() c.Check(err, IsNil) c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--crunchstat-interval=5", "--mount-by-pdh", "by_id", realTemp + "/keep1"}) - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/out", realTemp + "/tmp3:/tmp"}) + c.Check(bindmounts, DeepEquals, map[string]bindmount{"/out": {realTemp + "/tmp2", false}, "/tmp": {realTemp + "/tmp3", false}}) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() @@ -1293,12 +1113,12 @@ func (s *TestSuite) TestSetupMounts(c *C) { cr.Container.OutputPath = "/tmp" cr.Container.RuntimeConstraints.API = true - err := cr.SetupMounts() + bindmounts, err := cr.SetupMounts() c.Check(err, IsNil) c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--crunchstat-interval=5", "--mount-by-pdh", "by_id", realTemp + "/keep1"}) - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"}) + c.Check(bindmounts, DeepEquals, map[string]bindmount{"/tmp": {realTemp + "/tmp2", false}, "/etc/arvados/ca-certificates.crt": {stubCertPath, true}}) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() @@ -1316,12 +1136,12 @@ func (s *TestSuite) TestSetupMounts(c *C) { os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm) - err := cr.SetupMounts() + bindmounts, err := cr.SetupMounts() c.Check(err, IsNil) c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--crunchstat-interval=5", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"}) - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/tmp0:/keeptmp"}) + c.Check(bindmounts, DeepEquals, map[string]bindmount{"/keeptmp": {realTemp + "/keep1/tmp0", false}}) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() @@ -1339,14 +1159,15 @@ func (s *TestSuite) TestSetupMounts(c *C) { os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm) os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm) - err := cr.SetupMounts() + bindmounts, err := cr.SetupMounts() c.Check(err, IsNil) c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--crunchstat-interval=5", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"}) - sort.StringSlice(cr.Binds).Sort() - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro", - realTemp + "/keep1/tmp0:/keepout"}) + c.Check(bindmounts, DeepEquals, map[string]bindmount{ + "/keepinp": {realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", true}, + "/keepout": {realTemp + "/keep1/tmp0", false}, + }) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() @@ -1365,14 +1186,15 @@ func (s *TestSuite) TestSetupMounts(c *C) { os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm) os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm) - err := cr.SetupMounts() + bindmounts, err := cr.SetupMounts() c.Check(err, IsNil) c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--crunchstat-interval=5", "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"}) - sort.StringSlice(cr.Binds).Sort() - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro", - realTemp + "/keep1/tmp0:/keepout"}) + c.Check(bindmounts, DeepEquals, map[string]bindmount{ + "/keepinp": {realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", true}, + "/keepout": {realTemp + "/keep1/tmp0", false}, + }) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() @@ -1391,10 +1213,11 @@ func (s *TestSuite) TestSetupMounts(c *C) { cr.Container.Mounts = map[string]arvados.Mount{ "/mnt/test.json": {Kind: "json", Content: test.in}, } - err := cr.SetupMounts() + bindmounts, err := cr.SetupMounts() c.Check(err, IsNil) - sort.StringSlice(cr.Binds).Sort() - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/json2/mountdata.json:/mnt/test.json:ro"}) + c.Check(bindmounts, DeepEquals, map[string]bindmount{ + "/mnt/test.json": {realTemp + "/json2/mountdata.json", true}, + }) content, err := ioutil.ReadFile(realTemp + "/json2/mountdata.json") c.Check(err, IsNil) c.Check(content, DeepEquals, []byte(test.out)) @@ -1416,13 +1239,14 @@ func (s *TestSuite) TestSetupMounts(c *C) { cr.Container.Mounts = map[string]arvados.Mount{ "/mnt/test.txt": {Kind: "text", Content: test.in}, } - err := cr.SetupMounts() + bindmounts, err := cr.SetupMounts() if test.out == "error" { c.Check(err.Error(), Equals, "content for mount \"/mnt/test.txt\" must be a string") } else { c.Check(err, IsNil) - sort.StringSlice(cr.Binds).Sort() - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/text2/mountdata.text:/mnt/test.txt:ro"}) + c.Check(bindmounts, DeepEquals, map[string]bindmount{ + "/mnt/test.txt": {realTemp + "/text2/mountdata.text", true}, + }) content, err := ioutil.ReadFile(realTemp + "/text2/mountdata.text") c.Check(err, IsNil) c.Check(content, DeepEquals, []byte(test.out)) @@ -1445,12 +1269,15 @@ func (s *TestSuite) TestSetupMounts(c *C) { os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm) - err := cr.SetupMounts() + bindmounts, err := cr.SetupMounts() c.Check(err, IsNil) c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--crunchstat-interval=5", "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"}) - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"}) + c.Check(bindmounts, DeepEquals, map[string]bindmount{ + "/tmp": {realTemp + "/tmp2", false}, + "/tmp/foo": {realTemp + "/keep1/tmp0", true}, + }) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() @@ -1480,7 +1307,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { rf.Write([]byte("bar")) rf.Close() - err := cr.SetupMounts() + _, err := cr.SetupMounts() c.Check(err, IsNil) _, err = os.Stat(cr.HostOutputDir + "/foo") c.Check(err, IsNil) @@ -1502,7 +1329,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { } cr.Container.OutputPath = "/tmp" - err := cr.SetupMounts() + _, err := cr.SetupMounts() c.Check(err, NotNil) c.Check(err, ErrorMatches, `only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path.*`) os.RemoveAll(cr.ArvMountPoint) @@ -1519,7 +1346,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { "stdin": {Kind: "tmp"}, } - err := cr.SetupMounts() + _, err := cr.SetupMounts() c.Check(err, NotNil) c.Check(err, ErrorMatches, `unsupported mount kind 'tmp' for stdin.*`) os.RemoveAll(cr.ArvMountPoint) @@ -1550,34 +1377,24 @@ func (s *TestSuite) TestSetupMounts(c *C) { } cr.Container.OutputPath = "/tmp" - err := cr.SetupMounts() + bindmounts, err := cr.SetupMounts() c.Check(err, IsNil) - // dirMap[mountpoint] == tmpdir - dirMap := make(map[string]string) - for _, bind := range cr.Binds { - tokens := strings.Split(bind, ":") - dirMap[tokens[1]] = tokens[0] - - if cr.Container.Mounts[tokens[1]].Writable { - c.Check(len(tokens), Equals, 2) - } else { - c.Check(len(tokens), Equals, 3) - c.Check(tokens[2], Equals, "ro") - } + for path, mount := range bindmounts { + c.Check(mount.ReadOnly, Equals, !cr.Container.Mounts[path].Writable, Commentf("%s %#v", path, mount)) } - data, err := ioutil.ReadFile(dirMap["/tip"] + "/dir1/dir2/file with mode 0644") + data, err := ioutil.ReadFile(bindmounts["/tip"].HostPath + "/dir1/dir2/file with mode 0644") c.Check(err, IsNil) c.Check(string(data), Equals, "\000\001\002\003") - _, err = ioutil.ReadFile(dirMap["/tip"] + "/file only on testbranch") + _, err = ioutil.ReadFile(bindmounts["/tip"].HostPath + "/file only on testbranch") c.Check(err, FitsTypeOf, &os.PathError{}) c.Check(os.IsNotExist(err), Equals, true) - data, err = ioutil.ReadFile(dirMap["/non-tip"] + "/dir1/dir2/file with mode 0644") + data, err = ioutil.ReadFile(bindmounts["/non-tip"].HostPath + "/dir1/dir2/file with mode 0644") c.Check(err, IsNil) c.Check(string(data), Equals, "\000\001\002\003") - data, err = ioutil.ReadFile(dirMap["/non-tip"] + "/file only on testbranch") + data, err = ioutil.ReadFile(bindmounts["/non-tip"].HostPath + "/file only on testbranch") c.Check(err, IsNil) c.Check(string(data), Equals, "testfile\n") @@ -1589,7 +1406,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { func (s *TestSuite) TestStdout(c *C) { helperRecord := `{ "command": ["/bin/sh", "-c", "echo $FROBIZ"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "` + arvadostest.DockerImage112PDH + `", "cwd": "/bin", "environment": {"FROBIZ": "bilbo"}, "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"} }, @@ -1599,38 +1416,25 @@ func (s *TestSuite) TestStdout(c *C) { "state": "Locked" }` - api, cr, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) - t.logWriter.Close() + s.fullRunHelper(c, helperRecord, nil, 0, func() { + fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) - c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) + c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil) } // Used by the TestStdoutWithWrongPath*() -func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) { - rec := arvados.Container{} - err = json.Unmarshal([]byte(record), &rec) - c.Check(err, IsNil) - - s.docker.fn = fn - s.docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{}) - - api = &ArvTestClient{Container: rec} - kc := &KeepTestClient{} - defer kc.Close() - cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") +func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func()) (*ArvTestClient, *ContainerRunner, error) { + err := json.Unmarshal([]byte(record), &s.api.Container) c.Assert(err, IsNil) - am := &ArvMountCmdLine{} - cr.RunArvMount = am.ArvMountTest - cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { - return &ArvTestClient{}, &KeepTestClient{}, nil, nil + s.executor.runFunc = fn + s.runner.RunArvMount = (&ArvMountCmdLine{}).ArvMountTest + s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { + return s.api, &KeepTestClient{}, nil, nil } - - err = cr.Run() - return + return s.api, s.runner, s.runner.Run() } func (s *TestSuite) TestStdoutWithWrongPath(c *C) { @@ -1638,10 +1442,8 @@ func (s *TestSuite) TestStdoutWithWrongPath(c *C) { "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} }, "output_path": "/tmp", "state": "Locked" -}`, func(t *TestDockerClient) {}) - - c.Check(err, NotNil) - c.Check(strings.Contains(err.Error(), "Stdout path does not start with OutputPath"), Equals, true) +}`, func() {}) + c.Check(err, ErrorMatches, ".*Stdout path does not start with OutputPath.*") } func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) { @@ -1649,10 +1451,8 @@ func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) { "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} }, "output_path": "/tmp", "state": "Locked" -}`, func(t *TestDockerClient) {}) - - c.Check(err, NotNil) - c.Check(strings.Contains(err.Error(), "unsupported mount kind 'tmp' for stdout"), Equals, true) +}`, func() {}) + c.Check(err, ErrorMatches, ".*unsupported mount kind 'tmp' for stdout.*") } func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) { @@ -1660,18 +1460,14 @@ func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) { "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} }, "output_path": "/tmp", "state": "Locked" -}`, func(t *TestDockerClient) {}) - - c.Check(err, NotNil) - c.Check(strings.Contains(err.Error(), "unsupported mount kind 'collection' for stdout"), Equals, true) +}`, func() {}) + c.Check(err, ErrorMatches, ".*unsupported mount kind 'collection' for stdout.*") } func (s *TestSuite) TestFullRunWithAPI(c *C) { - defer os.Setenv("ARVADOS_API_HOST", os.Getenv("ARVADOS_API_HOST")) - os.Setenv("ARVADOS_API_HOST", "test.arvados.org") - api, _, _ := s.fullRunHelper(c, `{ - "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + s.fullRunHelper(c, `{ + "command": ["/bin/sh", "-c", "true $ARVADOS_API_HOST"], + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": "/bin", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -1679,23 +1475,20 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) { "priority": 1, "runtime_constraints": {"API": true}, "state": "Locked" -}`, nil, 0, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, t.env[1][17:]+"\n")) - t.logWriter.Close() +}`, nil, 0, func() { + c.Check(s.executor.created.Env["ARVADOS_API_HOST"], Equals, os.Getenv("ARVADOS_API_HOST")) + s.executor.exit <- 3 }) - - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) - c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "test.arvados.org\n"), Equals, true) - c.Check(api.CalledWith("container.output", "d41d8cd98f00b204e9800998ecf8427e+0"), NotNil) + c.Check(s.api.CalledWith("container.exit_code", 3), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) } func (s *TestSuite) TestFullRunSetOutput(c *C) { defer os.Setenv("ARVADOS_API_HOST", os.Getenv("ARVADOS_API_HOST")) os.Setenv("ARVADOS_API_HOST", "test.arvados.org") - api, _, _ := s.fullRunHelper(c, `{ + s.fullRunHelper(c, `{ "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": "/bin", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -1703,20 +1496,19 @@ func (s *TestSuite) TestFullRunSetOutput(c *C) { "priority": 1, "runtime_constraints": {"API": true}, "state": "Locked" -}`, nil, 0, func(t *TestDockerClient) { - t.api.Container.Output = "d4ab34d3d4f8a72f5c4973051ae69fab+122" - t.logWriter.Close() +}`, nil, 0, func() { + s.api.Container.Output = arvadostest.DockerImage112PDH }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) - c.Check(api.CalledWith("container.output", "d4ab34d3d4f8a72f5c4973051ae69fab+122"), NotNil) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) + c.Check(s.api.CalledWith("container.output", arvadostest.DockerImage112PDH), NotNil) } func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C) { helperRecord := `{ "command": ["/bin/sh", "-c", "echo $FROBIZ"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "` + arvadostest.DockerImage112PDH + `", "cwd": "/bin", "environment": {"FROBIZ": "bilbo"}, "mounts": { @@ -1735,20 +1527,19 @@ func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"} - api, cr, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) - t.logWriter.Close() + s.fullRunHelper(c, helperRecord, extraMounts, 0, func() { + fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) - c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) + c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil) } func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) { helperRecord := `{ "command": ["/bin/sh", "-c", "echo $FROBIZ"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "` + arvadostest.DockerImage112PDH + `", "cwd": "/bin", "environment": {"FROBIZ": "bilbo"}, "mounts": { @@ -1771,16 +1562,16 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) { "a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt", } - api, runner, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) - t.logWriter.Close() + api, _, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func() { + fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) }) - c.Check(runner.Binds, DeepEquals, []string{realtemp + "/tmp2:/tmp", - realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/file2_in_main.txt:/tmp/foo/bar:ro", - realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt:/tmp/foo/baz/sub2file2:ro", - realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1:/tmp/foo/sub1:ro", - realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt:/tmp/foo/sub1file2:ro", + c.Check(s.executor.created.BindMounts, DeepEquals, map[string]bindmount{ + "/tmp": {realtemp + "/tmp1", false}, + "/tmp/foo/bar": {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/file2_in_main.txt", true}, + "/tmp/foo/baz/sub2file2": {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt", true}, + "/tmp/foo/sub1": {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1", true}, + "/tmp/foo/sub1file2": {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt", true}, }) c.Check(api.CalledWith("container.exit_code", 0), NotNil) @@ -1806,7 +1597,7 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) { func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(c *C) { helperRecord := `{ "command": ["/bin/sh", "-c", "echo $FROBIZ"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "` + arvadostest.DockerImage112PDH + `", "cwd": "/bin", "environment": {"FROBIZ": "bilbo"}, "mounts": { @@ -1824,14 +1615,13 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest( "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt", } - api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) - t.logWriter.Close() + s.fullRunHelper(c, helperRecord, extraMounts, 0, func() { + fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) - for _, v := range api.Content { + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) + for _, v := range s.api.Content { if v["collection"] != nil { collection := v["collection"].(arvadosclient.Dict) if strings.Index(collection["name"].(string), "output") == 0 { @@ -1848,7 +1638,7 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest( func (s *TestSuite) TestOutputError(c *C) { helperRecord := `{ "command": ["/bin/sh", "-c", "echo $FROBIZ"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "` + arvadostest.DockerImage112PDH + `", "cwd": "/bin", "environment": {"FROBIZ": "bilbo"}, "mounts": { @@ -1859,21 +1649,17 @@ func (s *TestSuite) TestOutputError(c *C) { "runtime_constraints": {}, "state": "Locked" }` - - extraMounts := []string{} - - api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { - os.Symlink("/etc/hosts", t.realTemp+"/tmp2/baz") - t.logWriter.Close() + s.fullRunHelper(c, helperRecord, nil, 0, func() { + os.Symlink("/etc/hosts", s.runner.HostOutputDir+"/baz") }) - c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) + c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) } func (s *TestSuite) TestStdinCollectionMountPoint(c *C) { helperRecord := `{ "command": ["/bin/sh", "-c", "echo $FROBIZ"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "` + arvadostest.DockerImage112PDH + `", "cwd": "/bin", "environment": {"FROBIZ": "bilbo"}, "mounts": { @@ -1891,9 +1677,8 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) { "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt", } - api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) - t.logWriter.Close() + api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func() { + fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) }) c.Check(api.CalledWith("container.exit_code", 0), NotNil) @@ -1913,7 +1698,7 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) { func (s *TestSuite) TestStdinJsonMountPoint(c *C) { helperRecord := `{ "command": ["/bin/sh", "-c", "echo $FROBIZ"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "` + arvadostest.DockerImage112PDH + `", "cwd": "/bin", "environment": {"FROBIZ": "bilbo"}, "mounts": { @@ -1927,9 +1712,8 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) { "state": "Locked" }` - api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) - t.logWriter.Close() + api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func() { + fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) }) c.Check(api.CalledWith("container.exit_code", 0), NotNil) @@ -1949,7 +1733,7 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) { func (s *TestSuite) TestStderrMount(c *C) { api, cr, _ := s.fullRunHelper(c, `{ "command": ["/bin/sh", "-c", "echo hello;exit 1"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"}, @@ -1959,10 +1743,9 @@ func (s *TestSuite) TestStderrMount(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 1, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, "hello\n")) - t.logWriter.Write(dockerLog(2, "oops\n")) - t.logWriter.Close() +}`, nil, 1, func() { + fmt.Fprintln(s.executor.created.Stdout, "hello") + fmt.Fprintln(s.executor.created.Stderr, "oops") }) final := api.CalledWith("container.state", "Complete") @@ -1974,131 +1757,37 @@ func (s *TestSuite) TestStderrMount(c *C) { } func (s *TestSuite) TestNumberRoundTrip(c *C) { - kc := &KeepTestClient{} - defer kc.Close() - cr, err := NewContainerRunner(s.client, &ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + s.api.callraw = true + err := s.runner.fetchContainerRecord() c.Assert(err, IsNil) - cr.fetchContainerRecord() - - jsondata, err := json.Marshal(cr.Container.Mounts["/json"].Content) - + jsondata, err := json.Marshal(s.runner.Container.Mounts["/json"].Content) + c.Logf("%#v", s.runner.Container) c.Check(err, IsNil) c.Check(string(jsondata), Equals, `{"number":123456789123456789}`) } -func (s *TestSuite) TestFullBrokenDocker1(c *C) { - tf, err := ioutil.TempFile("", "brokenNodeHook-") - c.Assert(err, IsNil) - defer os.Remove(tf.Name()) - - tf.Write([]byte(`#!/bin/sh -exec echo killme -`)) - tf.Close() - os.Chmod(tf.Name(), 0700) - - ech := tf.Name() - brokenNodeHook = &ech - - api, _, _ := s.fullRunHelper(c, `{ - "command": ["echo", "hello world"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", - "cwd": ".", - "environment": {}, - "mounts": {"/tmp": {"kind": "tmp"} }, - "output_path": "/tmp", - "priority": 1, - "runtime_constraints": {}, - "state": "Locked" -}`, nil, 2, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, "hello world\n")) - t.logWriter.Close() - }) - - c.Check(api.CalledWith("container.state", "Queued"), NotNil) - c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*") - c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*") - c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*") - -} - -func (s *TestSuite) TestFullBrokenDocker2(c *C) { - ech := "" - brokenNodeHook = &ech - - api, _, _ := s.fullRunHelper(c, `{ - "command": ["echo", "hello world"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", - "cwd": ".", - "environment": {}, - "mounts": {"/tmp": {"kind": "tmp"} }, - "output_path": "/tmp", - "priority": 1, - "runtime_constraints": {}, - "state": "Locked" -}`, nil, 2, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, "hello world\n")) - t.logWriter.Close() - }) - - c.Check(api.CalledWith("container.state", "Queued"), NotNil) - c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*") - c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*") -} - -func (s *TestSuite) TestFullBrokenDocker3(c *C) { - ech := "" - brokenNodeHook = &ech - - api, _, _ := s.fullRunHelper(c, `{ - "command": ["echo", "hello world"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", - "cwd": ".", - "environment": {}, - "mounts": {"/tmp": {"kind": "tmp"} }, - "output_path": "/tmp", - "priority": 1, - "runtime_constraints": {}, - "state": "Locked" -}`, nil, 3, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, "hello world\n")) - t.logWriter.Close() - }) - - c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) - c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*") -} - -func (s *TestSuite) TestBadCommand1(c *C) { - ech := "" - brokenNodeHook = &ech - - api, _, _ := s.fullRunHelper(c, `{ - "command": ["echo", "hello world"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", - "cwd": ".", - "environment": {}, - "mounts": {"/tmp": {"kind": "tmp"} }, - "output_path": "/tmp", - "priority": 1, - "runtime_constraints": {}, - "state": "Locked" -}`, nil, 4, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, "hello world\n")) - t.logWriter.Close() - }) - - c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) - c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*") -} - -func (s *TestSuite) TestBadCommand2(c *C) { - ech := "" - brokenNodeHook = &ech - - api, _, _ := s.fullRunHelper(c, `{ +func (s *TestSuite) TestFullBrokenDocker(c *C) { + nextState := "" + for _, setup := range []func(){ + func() { + c.Log("// waitErr = ocl runtime error") + s.executor.waitErr = errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`) + nextState = "Cancelled" + }, + func() { + c.Log("// loadErr = cannot connect") + s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?") + *brokenNodeHook = c.MkDir() + "/broken-node-hook" + err := ioutil.WriteFile(*brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700) + c.Assert(err, IsNil) + nextState = "Queued" + }, + } { + s.SetUpTest(c) + setup() + s.fullRunHelper(c, `{ "command": ["echo", "hello world"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -2106,22 +1795,30 @@ func (s *TestSuite) TestBadCommand2(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 5, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, "hello world\n")) - t.logWriter.Close() - }) - - c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) - c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*") +}`, nil, 0, func() {}) + c.Check(s.api.CalledWith("container.state", nextState), NotNil) + c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*") + if *brokenNodeHook != "" { + c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*") + c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*") + c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*") + } else { + c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*") + } + } } -func (s *TestSuite) TestBadCommand3(c *C) { - ech := "" - brokenNodeHook = &ech - - api, _, _ := s.fullRunHelper(c, `{ +func (s *TestSuite) TestBadCommand(c *C) { + for _, startError := range []string{ + `panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`, + `Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`, + `Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`, + } { + s.SetUpTest(c) + s.executor.startErr = errors.New(startError) + s.fullRunHelper(c, `{ "command": ["echo", "hello world"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "`+arvadostest.DockerImage112PDH+`", "cwd": ".", "environment": {}, "mounts": {"/tmp": {"kind": "tmp"} }, @@ -2129,20 +1826,16 @@ func (s *TestSuite) TestBadCommand3(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 6, func(t *TestDockerClient) { - t.logWriter.Write(dockerLog(1, "hello world\n")) - t.logWriter.Close() - }) - - c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) - c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*") +}`, nil, 0, func() {}) + c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) + c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*") + } } func (s *TestSuite) TestSecretTextMountPoint(c *C) { - // under normal mounts, gets captured in output, oops helperRecord := `{ "command": ["true"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "` + arvadostest.DockerImage112PDH + `", "cwd": "/bin", "mounts": { "/tmp": {"kind": "tmp"}, @@ -2156,22 +1849,21 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) { "state": "Locked" }` - api, cr, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) { - content, err := ioutil.ReadFile(t.realTemp + "/tmp2/secret.conf") + s.fullRunHelper(c, helperRecord, nil, 0, func() { + content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf") c.Check(err, IsNil) - c.Check(content, DeepEquals, []byte("mypassword")) - t.logWriter.Close() + c.Check(string(content), Equals, "mypassword") }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) - c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), NotNil) - c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), IsNil) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) + c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), NotNil) + c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), IsNil) // under secret mounts, not captured in output helperRecord = `{ "command": ["true"], - "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "container_image": "` + arvadostest.DockerImage112PDH + `", "cwd": "/bin", "mounts": { "/tmp": {"kind": "tmp"} @@ -2185,17 +1877,17 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) { "state": "Locked" }` - api, cr, _ = s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) { - content, err := ioutil.ReadFile(t.realTemp + "/tmp2/secret.conf") + s.SetUpTest(c) + s.fullRunHelper(c, helperRecord, nil, 0, func() { + content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf") c.Check(err, IsNil) - c.Check(content, DeepEquals, []byte("mypassword")) - t.logWriter.Close() + c.Check(string(content), Equals, "mypassword") }) - c.Check(api.CalledWith("container.exit_code", 0), NotNil) - c.Check(api.CalledWith("container.state", "Complete"), NotNil) - c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), IsNil) - c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), NotNil) + c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) + c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) + c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), IsNil) + c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), NotNil) } type FakeProcess struct { diff --git a/lib/crunchrun/docker.go b/lib/crunchrun/docker.go new file mode 100644 index 0000000000..32c60e699a --- /dev/null +++ b/lib/crunchrun/docker.go @@ -0,0 +1,263 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 +package crunchrun + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "strings" + "time" + + dockertypes "github.com/docker/docker/api/types" + dockercontainer "github.com/docker/docker/api/types/container" + dockerclient "github.com/docker/docker/client" + "golang.org/x/net/context" +) + +// Docker daemon won't let you set a limit less than ~10 MiB +const minDockerRAM = int64(16 * 1024 * 1024) + +type dockerExecutor struct { + containerUUID string + logf func(string, ...interface{}) + watchdogInterval time.Duration + dockerclient *dockerclient.Client + containerID string + doneIO chan struct{} + errIO error +} + +func newDockerExecutor(containerUUID string, logf func(string, ...interface{}), watchdogInterval time.Duration) (*dockerExecutor, error) { + // API version 1.21 corresponds to Docker 1.9, which is + // currently the minimum version we want to support. + client, err := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) + if watchdogInterval < 1 { + watchdogInterval = time.Minute + } + return &dockerExecutor{ + containerUUID: containerUUID, + logf: logf, + watchdogInterval: watchdogInterval, + dockerclient: client, + }, err +} + +func (e *dockerExecutor) ImageLoaded(imageID string) bool { + _, _, err := e.dockerclient.ImageInspectWithRaw(context.TODO(), imageID) + return err == nil +} + +func (e *dockerExecutor) LoadImage(filename string) error { + f, err := os.Open(filename) + if err != nil { + return err + } + defer f.Close() + resp, err := e.dockerclient.ImageLoad(context.TODO(), f, true) + if err != nil { + return fmt.Errorf("While loading container image into Docker: %v", err) + } + defer resp.Body.Close() + buf, _ := ioutil.ReadAll(resp.Body) + e.logf("loaded image: response %s", buf) + return nil +} + +func (e *dockerExecutor) Create(spec containerSpec) error { + e.logf("Creating Docker container") + cfg := dockercontainer.Config{ + Image: spec.Image, + Cmd: spec.Command, + WorkingDir: spec.WorkingDir, + Volumes: map[string]struct{}{}, + OpenStdin: spec.Stdin != nil, + StdinOnce: spec.Stdin != nil, + AttachStdin: spec.Stdin != nil, + AttachStdout: true, + AttachStderr: true, + } + if cfg.WorkingDir == "." { + cfg.WorkingDir = "" + } + for k, v := range spec.Env { + cfg.Env = append(cfg.Env, k+"="+v) + } + if spec.RAM < minDockerRAM { + spec.RAM = minDockerRAM + } + hostCfg := dockercontainer.HostConfig{ + LogConfig: dockercontainer.LogConfig{ + Type: "none", + }, + NetworkMode: dockercontainer.NetworkMode("none"), + Resources: dockercontainer.Resources{ + CgroupParent: spec.CgroupParent, + NanoCPUs: int64(spec.VCPUs) * 1000000000, + Memory: spec.RAM, // RAM + MemorySwap: spec.RAM, // RAM+swap + KernelMemory: spec.RAM, // kernel portion + }, + } + for path, mount := range spec.BindMounts { + bind := mount.HostPath + ":" + path + if mount.ReadOnly { + bind += ":ro" + } + hostCfg.Binds = append(hostCfg.Binds, bind) + } + if spec.EnableNetwork { + hostCfg.NetworkMode = dockercontainer.NetworkMode(spec.NetworkMode) + } + + created, err := e.dockerclient.ContainerCreate(context.TODO(), &cfg, &hostCfg, nil, e.containerUUID) + if err != nil { + return fmt.Errorf("While creating container: %v", err) + } + e.containerID = created.ID + return e.startIO(spec.Stdin, spec.Stdout, spec.Stderr) +} + +func (e *dockerExecutor) CgroupID() string { + return e.containerID +} + +func (e *dockerExecutor) Start() error { + return e.dockerclient.ContainerStart(context.TODO(), e.containerID, dockertypes.ContainerStartOptions{}) +} + +func (e *dockerExecutor) Stop() error { + err := e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true}) + if err != nil && strings.Contains(err.Error(), "No such container: "+e.containerID) { + err = nil + } + return err +} + +// Wait for the container to terminate, capture the exit code, and +// wait for stdout/stderr logging to finish. +func (e *dockerExecutor) Wait(ctx context.Context) (int, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + watchdogErr := make(chan error, 1) + go func() { + ticker := time.NewTicker(e.watchdogInterval) + defer ticker.Stop() + for range ticker.C { + dctx, dcancel := context.WithDeadline(ctx, time.Now().Add(e.watchdogInterval)) + ctr, err := e.dockerclient.ContainerInspect(dctx, e.containerID) + dcancel() + if ctx.Err() != nil { + // Either the container already + // exited, or our caller is trying to + // kill it. + return + } else if err != nil { + e.logf("Error inspecting container: %s", err) + watchdogErr <- err + return + } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") { + watchdogErr <- fmt.Errorf("Container is not running: State=%v", ctr.State) + return + } + } + }() + + waitOk, waitErr := e.dockerclient.ContainerWait(ctx, e.containerID, dockercontainer.WaitConditionNotRunning) + for { + select { + case waitBody := <-waitOk: + e.logf("Container exited with code: %v", waitBody.StatusCode) + // wait for stdout/stderr to complete + <-e.doneIO + return int(waitBody.StatusCode), nil + + case err := <-waitErr: + return -1, fmt.Errorf("container wait: %v", err) + + case <-ctx.Done(): + return -1, ctx.Err() + + case err := <-watchdogErr: + return -1, err + } + } +} + +func (e *dockerExecutor) startIO(stdin io.ReadCloser, stdout, stderr io.WriteCloser) error { + resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{ + Stream: true, + Stdin: stdin != nil, + Stdout: true, + Stderr: true, + }) + if err != nil { + return fmt.Errorf("error attaching container stdin/stdout/stderr streams: %v", err) + } + var errStdin error + if stdin != nil { + go func() { + errStdin = e.handleStdin(stdin, resp.Conn, resp.CloseWrite) + }() + } + e.doneIO = make(chan struct{}) + go func() { + e.errIO = e.handleStdoutStderr(stdout, stderr, resp.Reader) + if e.errIO == nil && errStdin != nil { + e.errIO = errStdin + } + close(e.doneIO) + }() + return nil +} + +func (e *dockerExecutor) handleStdin(stdin io.ReadCloser, conn io.Writer, closeConn func() error) error { + defer stdin.Close() + defer closeConn() + _, err := io.Copy(conn, stdin) + if err != nil { + return fmt.Errorf("While writing to docker container on stdin: %v", err) + } + return nil +} + +// Handle docker log protocol; see +// https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container +func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.WriteCloser, reader io.Reader) error { + header := make([]byte, 8) + var err error + for err == nil { + _, err = io.ReadAtLeast(reader, header, 8) + if err != nil { + if err == io.EOF { + err = nil + } + break + } + readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24) + if header[0] == 1 { + _, err = io.CopyN(stdout, reader, readsize) + } else { + // stderr + _, err = io.CopyN(stderr, reader, readsize) + } + } + if err != nil { + return fmt.Errorf("error copying stdout/stderr from docker: %v", err) + } + err = stdout.Close() + if err != nil { + return fmt.Errorf("error writing stdout: close: %v", err) + } + err = stderr.Close() + if err != nil { + return fmt.Errorf("error writing stderr: close: %v", err) + } + return nil +} + +func (e *dockerExecutor) Close() { + e.dockerclient.ContainerRemove(context.TODO(), e.containerID, dockertypes.ContainerRemoveOptions{Force: true}) +} diff --git a/lib/crunchrun/docker_test.go b/lib/crunchrun/docker_test.go new file mode 100644 index 0000000000..28eb59546a --- /dev/null +++ b/lib/crunchrun/docker_test.go @@ -0,0 +1,31 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package crunchrun + +import ( + "os/exec" + "time" + + . "gopkg.in/check.v1" +) + +var _ = Suite(&dockerSuite{}) + +type dockerSuite struct { + executorSuite +} + +func (s *dockerSuite) SetUpSuite(c *C) { + _, err := exec.LookPath("docker") + if err != nil { + c.Skip("looks like docker is not installed") + } + s.newExecutor = func(c *C) { + exec.Command("docker", "rm", "zzzzz-zzzzz-zzzzzzzzzzzzzzz").Run() + var err error + s.executor, err = newDockerExecutor("zzzzz-zzzzz-zzzzzzzzzzzzzzz", c.Logf, time.Second/2) + c.Assert(err, IsNil) + } +} diff --git a/lib/crunchrun/executor.go b/lib/crunchrun/executor.go new file mode 100644 index 0000000000..c773febe94 --- /dev/null +++ b/lib/crunchrun/executor.go @@ -0,0 +1,63 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 +package crunchrun + +import ( + "io" + + "golang.org/x/net/context" +) + +type bindmount struct { + HostPath string + ReadOnly bool +} + +type containerSpec struct { + Image string + VCPUs int + RAM int64 + WorkingDir string + Env map[string]string + BindMounts map[string]bindmount + Command []string + EnableNetwork bool + NetworkMode string // docker network mode, normally "default" + CgroupParent string + Stdin io.ReadCloser + Stdout io.WriteCloser + Stderr io.WriteCloser +} + +// containerExecutor is an interface to a container runtime +// (docker/singularity). +type containerExecutor interface { + // ImageLoaded determines whether the given image is already + // available to use without calling ImageLoad. + ImageLoaded(imageID string) bool + + // ImageLoad loads the image from the given tarball such that + // it can be used to create/start a container. + LoadImage(filename string) error + + // Wait for the container process to finish, and return its + // exit code. If applicable, also remove the stopped container + // before returning. + Wait(context.Context) (int, error) + + // Create a container, but don't start it yet. + Create(containerSpec) error + + // Start the container + Start() error + + // CID the container will belong to + CgroupID() string + + // Stop the container immediately + Stop() error + + // Release resources (temp dirs, stopped containers) + Close() +} diff --git a/lib/crunchrun/executor_test.go b/lib/crunchrun/executor_test.go new file mode 100644 index 0000000000..5728a9ccf0 --- /dev/null +++ b/lib/crunchrun/executor_test.go @@ -0,0 +1,161 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package crunchrun + +import ( + "bytes" + "io" + "io/ioutil" + "net/http" + "os" + "strings" + "time" + + "golang.org/x/net/context" + . "gopkg.in/check.v1" +) + +var _ = Suite(&dockerSuite{}) + +func busyboxDockerImage(c *C) string { + fnm := "busybox_uclibc.tar" + cachedir := "/var/lib/arvados/tmp" + cachefile := cachedir + "/" + fnm + if _, err := os.Stat(cachefile); err == nil { + return cachefile + } + + f, err := ioutil.TempFile(cachedir, "") + c.Assert(err, IsNil) + defer f.Close() + defer os.Remove(f.Name()) + + resp, err := http.Get("http://cache.arvados.org/" + fnm) + c.Assert(err, IsNil) + defer resp.Body.Close() + _, err = io.Copy(f, resp.Body) + c.Assert(err, IsNil) + err = f.Close() + c.Assert(err, IsNil) + err = os.Rename(f.Name(), cachefile) + c.Assert(err, IsNil) + + return cachefile +} + +type nopWriteCloser struct{ io.Writer } + +func (nopWriteCloser) Close() error { return nil } + +// embedded by dockerSuite and singularitySuite so they can share +// tests. +type executorSuite struct { + newExecutor func(*C) // embedding struct's SetUpSuite method must set this + executor containerExecutor + spec containerSpec + stdout bytes.Buffer + stderr bytes.Buffer +} + +func (s *executorSuite) SetUpTest(c *C) { + s.newExecutor(c) + s.stdout = bytes.Buffer{} + s.stderr = bytes.Buffer{} + s.spec = containerSpec{ + Image: "busybox:uclibc", + VCPUs: 1, + WorkingDir: "", + Env: map[string]string{"PATH": "/bin:/usr/bin"}, + NetworkMode: "default", + Stdout: nopWriteCloser{&s.stdout}, + Stderr: nopWriteCloser{&s.stderr}, + } + err := s.executor.LoadImage(busyboxDockerImage(c)) + c.Assert(err, IsNil) +} + +func (s *executorSuite) TearDownTest(c *C) { + s.executor.Close() +} + +func (s *executorSuite) TestExecTrivialContainer(c *C) { + s.spec.Command = []string{"echo", "ok"} + s.checkRun(c, 0) + c.Check(s.stdout.String(), Equals, "ok\n") + c.Check(s.stderr.String(), Equals, "") +} + +func (s *executorSuite) TestExecStop(c *C) { + s.spec.Command = []string{"sh", "-c", "sleep 10; echo ok"} + err := s.executor.Create(s.spec) + c.Assert(err, IsNil) + err = s.executor.Start() + c.Assert(err, IsNil) + go func() { + time.Sleep(time.Second / 10) + s.executor.Stop() + }() + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second)) + defer cancel() + code, err := s.executor.Wait(ctx) + c.Check(code, Not(Equals), 0) + c.Check(err, IsNil) + c.Check(s.stdout.String(), Equals, "") + c.Check(s.stderr.String(), Equals, "") +} + +func (s *executorSuite) TestExecCleanEnv(c *C) { + s.spec.Command = []string{"env"} + s.checkRun(c, 0) + c.Check(s.stderr.String(), Equals, "") + got := map[string]string{} + for _, kv := range strings.Split(s.stdout.String(), "\n") { + if kv == "" { + continue + } + kv := strings.SplitN(kv, "=", 2) + switch kv[0] { + case "HOSTNAME", "HOME": + // docker sets these by itself + case "LD_LIBRARY_PATH", "SINGULARITY_NAME", "PWD", "LANG", "SHLVL", "SINGULARITY_INIT", "SINGULARITY_CONTAINER": + // singularity sets these by itself (cf. https://sylabs.io/guides/3.5/user-guide/environment_and_metadata.html) + case "PROMPT_COMMAND", "PS1", "SINGULARITY_APPNAME": + // singularity also sets these by itself (as of v3.5.2) + default: + got[kv[0]] = kv[1] + } + } + c.Check(got, DeepEquals, s.spec.Env) +} +func (s *executorSuite) TestExecEnableNetwork(c *C) { + for _, enable := range []bool{false, true} { + s.SetUpTest(c) + s.spec.Command = []string{"ip", "route"} + s.spec.EnableNetwork = enable + s.checkRun(c, 0) + if enable { + c.Check(s.stdout.String(), Matches, "(?ms).*default via.*") + } else { + c.Check(s.stdout.String(), Equals, "") + } + } +} + +func (s *executorSuite) TestExecStdoutStderr(c *C) { + s.spec.Command = []string{"sh", "-c", "echo foo; echo -n bar >&2; echo baz; echo waz >&2"} + s.checkRun(c, 0) + c.Check(s.stdout.String(), Equals, "foo\nbaz\n") + c.Check(s.stderr.String(), Equals, "barwaz\n") +} + +func (s *executorSuite) checkRun(c *C, expectCode int) { + c.Assert(s.executor.Create(s.spec), IsNil) + c.Assert(s.executor.Start(), IsNil) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second)) + defer cancel() + code, err := s.executor.Wait(ctx) + c.Assert(err, IsNil) + c.Check(code, Equals, expectCode) +} diff --git a/lib/crunchrun/logging_test.go b/lib/crunchrun/logging_test.go index e3fa3af0bb..55460af379 100644 --- a/lib/crunchrun/logging_test.go +++ b/lib/crunchrun/logging_test.go @@ -45,7 +45,7 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz") + cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp @@ -74,7 +74,7 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz") + cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp cr.CrunchLog.Immediate = nil @@ -97,7 +97,7 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz") + cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) ts := &TestTimestamper{} cr.CrunchLog.Timestamper = ts.Timestamp @@ -146,7 +146,7 @@ func (s *LoggingTestSuite) TestLogUpdate(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz") + cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) ts := &TestTimestamper{} cr.CrunchLog.Timestamper = ts.Timestamp @@ -197,7 +197,7 @@ func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string api := &ArvTestClient{} kc := &KeepTestClient{} defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz") + cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp diff --git a/lib/crunchrun/singularity.go b/lib/crunchrun/singularity.go new file mode 100644 index 0000000000..d783baab9f --- /dev/null +++ b/lib/crunchrun/singularity.go @@ -0,0 +1,140 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package crunchrun + +import ( + "io/ioutil" + "os" + "os/exec" + "syscall" + + "golang.org/x/net/context" +) + +type singularityExecutor struct { + logf func(string, ...interface{}) + spec containerSpec + tmpdir string + child *exec.Cmd + imageFilename string // "sif" image +} + +func newSingularityExecutor(logf func(string, ...interface{})) (*singularityExecutor, error) { + tmpdir, err := ioutil.TempDir("", "crunch-run-singularity-") + if err != nil { + return nil, err + } + return &singularityExecutor{ + logf: logf, + tmpdir: tmpdir, + }, nil +} + +func (e *singularityExecutor) ImageLoaded(string) bool { + return false +} + +// LoadImage will satisfy ContainerExecuter interface transforming +// containerImage into a sif file for later use. +func (e *singularityExecutor) LoadImage(imageTarballPath string) error { + e.logf("building singularity image") + e.imageFilename = e.tmpdir + "/image.sif" + build := exec.Command("singularity", "build", e.imageFilename, "docker-archive://"+imageTarballPath) + e.logf("%v", build.Args) + out, err := build.CombinedOutput() + // INFO: Starting build... + // Getting image source signatures + // Copying blob ab15617702de done + // Copying config 651e02b8a2 done + // Writing manifest to image destination + // Storing signatures + // 2021/04/22 14:42:14 info unpack layer: sha256:21cbfd3a344c52b197b9fa36091e66d9cbe52232703ff78d44734f85abb7ccd3 + // INFO: Creating SIF file... + // INFO: Build complete: arvados-jobs.latest.sif + e.logf("%s", out) + if err != nil { + return err + } + return nil +} + +func (e *singularityExecutor) Create(spec containerSpec) error { + e.spec = spec + return nil +} + +func (e *singularityExecutor) Start() error { + args := []string{"singularity", "exec", "--containall", "--no-home", "--cleanenv"} + if !e.spec.EnableNetwork { + args = append(args, "--net", "--network=none") + } + readonlyflag := map[bool]string{ + false: "rw", + true: "ro", + } + for path, mount := range e.spec.BindMounts { + args = append(args, "--bind", mount.HostPath+":"+path+":"+readonlyflag[mount.ReadOnly]) + } + args = append(args, e.imageFilename) + args = append(args, e.spec.Command...) + + // This is for singularity 3.5.2. There are some behaviors + // that will change in singularity 3.6, please see: + // https://sylabs.io/guides/3.7/user-guide/environment_and_metadata.html + // https://sylabs.io/guides/3.5/user-guide/environment_and_metadata.html + env := make([]string, 0, len(e.spec.Env)) + for k, v := range e.spec.Env { + env = append(env, "SINGULARITYENV_"+k+"="+v) + } + + path, err := exec.LookPath(args[0]) + if err != nil { + return err + } + child := &exec.Cmd{ + Path: path, + Args: args, + Env: env, + Stdin: e.spec.Stdin, + Stdout: e.spec.Stdout, + Stderr: e.spec.Stderr, + } + err = child.Start() + if err != nil { + return err + } + e.child = child + return nil +} + +func (e *singularityExecutor) CgroupID() string { + return "" +} + +func (e *singularityExecutor) Stop() error { + if err := e.child.Process.Signal(syscall.Signal(0)); err != nil { + // process already exited + return nil + } + return e.child.Process.Signal(syscall.SIGKILL) +} + +func (e *singularityExecutor) Wait(context.Context) (int, error) { + err := e.child.Wait() + if err, ok := err.(*exec.ExitError); ok { + return err.ProcessState.ExitCode(), nil + } + if err != nil { + return 0, err + } + return e.child.ProcessState.ExitCode(), nil +} + +func (e *singularityExecutor) Close() { + err := os.RemoveAll(e.tmpdir) + if err != nil { + e.logf("error removing temp dir: %s", err) + } +} diff --git a/lib/crunchrun/singularity_test.go b/lib/crunchrun/singularity_test.go new file mode 100644 index 0000000000..a1263da321 --- /dev/null +++ b/lib/crunchrun/singularity_test.go @@ -0,0 +1,29 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package crunchrun + +import ( + "os/exec" + + . "gopkg.in/check.v1" +) + +var _ = Suite(&singularitySuite{}) + +type singularitySuite struct { + executorSuite +} + +func (s *singularitySuite) SetUpSuite(c *C) { + _, err := exec.LookPath("singularity") + if err != nil { + c.Skip("looks like singularity is not installed") + } + s.newExecutor = func(c *C) { + var err error + s.executor, err = newSingularityExecutor(c.Logf) + c.Assert(err, IsNil) + } +} diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go index 8752ee0544..829a053636 100644 --- a/lib/dispatchcloud/dispatcher_test.go +++ b/lib/dispatchcloud/dispatcher_test.go @@ -56,6 +56,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { CrunchRunArgumentsList: []string{"--foo", "--extra='args'"}, DispatchPrivateKey: string(dispatchprivraw), StaleLockTimeout: arvados.Duration(5 * time.Millisecond), + RuntimeEngine: "stub", CloudVMs: arvados.CloudVMsConfig{ Driver: "test", SyncInterval: arvados.Duration(10 * time.Millisecond), @@ -163,7 +164,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond))) stubvm.ExecuteContainer = executeContainer stubvm.CrashRunningContainer = finishContainer - stubvm.ExtraCrunchRunArgs = "'--foo' '--extra='\\''args'\\'''" + stubvm.ExtraCrunchRunArgs = "'--runtime-engine=stub' '--foo' '--extra='\\''args'\\'''" switch n % 7 { case 0: stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond) diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 7289179fd6..a5924cf997 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -122,7 +122,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe installPublicKey: installPublicKey, tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix, runnerCmdDefault: cluster.Containers.CrunchRunCommand, - runnerArgs: cluster.Containers.CrunchRunArgumentsList, + runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...), stop: make(chan bool), } wp.registerMetrics(reg) diff --git a/lib/install/deps.go b/lib/install/deps.go index 8df3fba532..255e56aaf5 100644 --- a/lib/install/deps.go +++ b/lib/install/deps.go @@ -181,6 +181,11 @@ func (inst *installCommand) RunCommand(prog string, args []string, stdin io.Read "wget", "xvfb", ) + if dev || test { + pkgs = append(pkgs, + "squashfs-tools", // for singularity + ) + } switch { case osv.Debian && osv.Major >= 10: pkgs = append(pkgs, "libcurl4") @@ -315,6 +320,28 @@ rm ${zip} } } + singularityversion := "3.5.2" + if havesingularityversion, err := exec.Command("/var/lib/arvados/bin/singularity", "--version").CombinedOutput(); err == nil && strings.Contains(string(havesingularityversion), singularityversion) { + logger.Print("singularity " + singularityversion + " already installed") + } else if dev || test { + err = inst.runBash(` +S=`+singularityversion+` +tmp=/var/lib/arvados/tmp/singularity +trap "rm -r ${tmp}" ERR EXIT +cd /var/lib/arvados/tmp +git clone https://github.com/sylabs/singularity +cd singularity +git checkout v${S} +./mconfig --prefix=/var/lib/arvados +make -C ./builddir +make -C ./builddir install +rm -r ${tmp} +`, stdout, stderr) + if err != nil { + return 1 + } + } + // The entry in /etc/locale.gen is "en_US.UTF-8"; once // it's installed, locale -a reports it as // "en_US.utf8". diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index 2c6db42d13..2ff4323218 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -414,6 +414,7 @@ type ContainersConfig struct { StaleLockTimeout Duration SupportedDockerImageFormats StringSet UsePreemptibleInstances bool + RuntimeEngine string JobsAPI struct { Enable string diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go index aeb5a47e6d..16e2098165 100644 --- a/sdk/go/arvadostest/fixtures.go +++ b/sdk/go/arvadostest/fixtures.go @@ -96,6 +96,9 @@ const ( LogCollectionUUID = "zzzzz-4zz18-logcollection01" LogCollectionUUID2 = "zzzzz-4zz18-logcollection02" + + DockerImage112PDH = "d740a57097711e08eb9b2a93518f20ab+174" + DockerImage112Filename = "sha256:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678.tar" ) // PathologicalManifest : A valid manifest designed to test -- 2.30.2