X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/56e130608f8977d20b21c54f6ab8973d71e045a0..40f551004ab4e5f1d8ab02ddb55dca225ee8f6ac:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index b252e0dce1..a5e69387ec 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -6,6 +6,7 @@ package crunchrun import ( "bytes" + "context" "encoding/json" "errors" "flag" @@ -13,6 +14,8 @@ import ( "io" "io/ioutil" "log" + "net" + "net/http" "os" "os/exec" "os/signal" @@ -33,19 +36,20 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" "git.arvados.org/arvados.git/sdk/go/manifest" - "golang.org/x/crypto/ssh" - "golang.org/x/net/context" - - dockertypes "github.com/docker/docker/api/types" - dockercontainer "github.com/docker/docker/api/types/container" - dockernetwork "github.com/docker/docker/api/types/network" - dockerclient "github.com/docker/docker/client" ) type command struct{} var Command = command{} +// ConfigData contains environment variables and (when needed) cluster +// configuration, passed from dispatchcloud to crunch-run on stdin. +type ConfigData struct { + Env map[string]string + KeepBuffers int + Cluster *arvados.Cluster +} + // IArvadosClient is the minimal Arvados API methods used by crunch-run. type IArvadosClient interface { Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error @@ -61,34 +65,21 @@ var ErrCancelled = errors.New("Cancelled") // IKeepClient is the minimal Keep API methods used by crunch-run. type IKeepClient interface { - PutB(buf []byte) (string, int, error) + BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) ReadAt(locator string, p []byte, off int) (int, error) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) LocalLocator(locator string) (string, error) ClearBlockCache() + SetStorageClasses(sc []string) } // NewLogWriter is a factory function to create a new log writer. type NewLogWriter func(name string) (io.WriteCloser, error) -type RunArvMount func(args []string, tok string) (*exec.Cmd, error) +type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error) type MkTempDir func(string, string) (string, error) -// ThinDockerClient is the minimal Docker client interface used by crunch-run. -type ThinDockerClient interface { - ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) - ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, - networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) - ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error - ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error - ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) - ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error) - ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) - ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) - ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) -} - type PsProcess interface { CmdlineSlice() ([]string, error) } @@ -96,7 +87,10 @@ type PsProcess interface { // ContainerRunner is the main stateful struct used for a single execution of a // container. type ContainerRunner struct { - Docker ThinDockerClient + executor containerExecutor + executorStdin io.Closer + executorStdout io.Closer + executorStderr io.Closer // Dispatcher client is initialized with the Dispatcher token. // This is a privileged token used to manage container status @@ -120,36 +114,31 @@ type ContainerRunner struct { ContainerArvClient IArvadosClient ContainerKeepClient IKeepClient - Container arvados.Container - ContainerConfig dockercontainer.Config - HostConfig dockercontainer.HostConfig - token string - ContainerID string - ExitCode *int - NewLogWriter NewLogWriter - loggingDone chan bool - CrunchLog *ThrottledLogger - Stdout io.WriteCloser - Stderr io.WriteCloser - logUUID string - logMtx sync.Mutex - LogCollection arvados.CollectionFileSystem - LogsPDH *string - RunArvMount RunArvMount - MkTempDir MkTempDir - ArvMount *exec.Cmd - ArvMountPoint string - HostOutputDir string - Binds []string - Volumes map[string]struct{} - OutputPDH *string - SigChan chan os.Signal - ArvMountExit chan error - SecretMounts map[string]arvados.Mount - MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) - finalState string - parentTemp string - + Container arvados.Container + token string + ExitCode *int + NewLogWriter NewLogWriter + CrunchLog *ThrottledLogger + logUUID string + logMtx sync.Mutex + LogCollection arvados.CollectionFileSystem + LogsPDH *string + RunArvMount RunArvMount + MkTempDir MkTempDir + ArvMount *exec.Cmd + ArvMountPoint string + HostOutputDir string + Volumes map[string]struct{} + OutputPDH *string + SigChan chan os.Signal + ArvMountExit chan error + SecretMounts map[string]arvados.Mount + MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) + finalState string + parentTemp string + + keepstoreLogger io.WriteCloser + keepstoreLogbuf *bufThenWrite statLogger io.WriteCloser statReporter *crunchstat.Reporter hoststatLogger io.WriteCloser @@ -172,21 +161,20 @@ type ContainerRunner struct { cStateLock sync.Mutex cCancelled bool // StopContainer() invoked - cRemoved bool // docker confirmed the container no longer exists - enableNetwork string // one of "default" or "always" - networkMode string // passed through to HostConfig.NetworkMode - arvMountLog *ThrottledLogger + enableMemoryLimit bool + enableNetwork string // one of "default" or "always" + networkMode string // "none", "host", or "" -- passed through to executor + arvMountLog *ThrottledLogger containerWatchdogInterval time.Duration - gatewayAddress string - gatewaySSHConfig *ssh.ServerConfig - gatewayAuthSecret string + gateway Gateway } -// setupSignals sets up signal handling to gracefully terminate the underlying -// Docker container and update state when receiving a TERM, INT or QUIT signal. +// setupSignals sets up signal handling to gracefully terminate the +// underlying container and update state when receiving a TERM, INT or +// QUIT signal. func (runner *ContainerRunner) setupSignals() { runner.SigChan = make(chan os.Signal, 1) signal.Notify(runner.SigChan, syscall.SIGTERM) @@ -200,24 +188,18 @@ func (runner *ContainerRunner) setupSignals() { }(runner.SigChan) } -// stop the underlying Docker container. +// stop the underlying container. func (runner *ContainerRunner) stop(sig os.Signal) { runner.cStateLock.Lock() defer runner.cStateLock.Unlock() if sig != nil { runner.CrunchLog.Printf("caught signal: %v", sig) } - if runner.ContainerID == "" { - return - } runner.cCancelled = true - runner.CrunchLog.Printf("removing container") - err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true}) + runner.CrunchLog.Printf("stopping container") + err := runner.executor.Stop() if err != nil { - runner.CrunchLog.Printf("error removing container: %s", err) - } - if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) { - runner.cRemoved = true + runner.CrunchLog.Printf("error stopping container: %s", err) } } @@ -265,61 +247,46 @@ func (runner *ContainerRunner) checkBrokenNode(goterr error) bool { // LoadImage determines the docker image id from the container record and // checks if it is available in the local Docker image store. If not, it loads // the image from Keep. -func (runner *ContainerRunner) LoadImage() (err error) { - +func (runner *ContainerRunner) LoadImage() (string, error) { runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage) - var collection arvados.Collection - err = runner.ContainerArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection) + d, err := os.Open(runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage) if err != nil { - return fmt.Errorf("While getting container image collection: %v", err) + return "", err + } + defer d.Close() + allfiles, err := d.Readdirnames(-1) + if err != nil { + return "", err } - manifest := manifest.Manifest{Text: collection.ManifestText} - var img, imageID string - for ms := range manifest.StreamIter() { - img = ms.FileStreamSegments[0].Name - if !strings.HasSuffix(img, ".tar") { - return fmt.Errorf("First file in the container image collection does not end in .tar") + var tarfiles []string + for _, fnm := range allfiles { + if strings.HasSuffix(fnm, ".tar") { + tarfiles = append(tarfiles, fnm) } - imageID = img[:len(img)-4] } + if len(tarfiles) == 0 { + return "", fmt.Errorf("image collection does not include a .tar image file") + } + if len(tarfiles) > 1 { + return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles) + } + imageID := tarfiles[0][:len(tarfiles[0])-4] + imageTarballPath := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + imageID + ".tar" + runner.CrunchLog.Printf("Using Docker image id %q", imageID) - runner.CrunchLog.Printf("Using Docker image id '%s'", imageID) - - _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID) + runner.CrunchLog.Print("Loading Docker image from keep") + err = runner.executor.LoadImage(imageID, imageTarballPath, runner.Container, runner.ArvMountPoint, + runner.containerClient) if err != nil { - runner.CrunchLog.Print("Loading Docker image from keep") - - var readCloser io.ReadCloser - readCloser, err = runner.ContainerKeepClient.ManifestFileReader(manifest, img) - if err != nil { - return fmt.Errorf("While creating ManifestFileReader for container image: %v", err) - } - - response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true) - if err != nil { - return fmt.Errorf("While loading container image into Docker: %v", err) - } - - defer response.Body.Close() - rbody, err := ioutil.ReadAll(response.Body) - if err != nil { - return fmt.Errorf("Reading response to image load: %v", err) - } - runner.CrunchLog.Printf("Docker response: %s", rbody) - } else { - runner.CrunchLog.Print("Docker image is available") + return "", err } - runner.ContainerConfig.Image = imageID - - runner.ContainerKeepClient.ClearBlockCache() - - return nil + return imageID, nil } -func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) { - c = exec.Command("arv-mount", arvMountCmd...) +func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) { + c = exec.Command(cmdline[0], cmdline[1:]...) // Copy our environment, but override ARVADOS_API_TOKEN with // the container auth token. @@ -336,8 +303,16 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( return nil, err } runner.arvMountLog = NewThrottledLogger(w) + scanner := logScanner{ + Patterns: []string{ + "Keep write error", + "Block not found error", + "Unhandled exception during FUSE operation", + }, + ReportFunc: runner.reportArvMountWarning, + } c.Stdout = runner.arvMountLog - c.Stderr = runner.arvMountLog + c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner) runner.CrunchLog.Printf("Running %v", c.Args) @@ -421,32 +396,37 @@ func copyfile(src string, dst string) (err error) { return nil } -func (runner *ContainerRunner) SetupMounts() (err error) { - err = runner.SetupArvMountPoint("keep") +func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { + bindmounts := map[string]bindmount{} + err := runner.SetupArvMountPoint("keep") if err != nil { - return fmt.Errorf("While creating keep mount temp dir: %v", err) + return nil, fmt.Errorf("While creating keep mount temp dir: %v", err) } token, err := runner.ContainerToken() if err != nil { - return fmt.Errorf("could not get container token: %s", err) + return nil, fmt.Errorf("could not get container token: %s", err) } + runner.CrunchLog.Printf("container token %q", token) pdhOnly := true tmpcount := 0 arvMountCmd := []string{ + "arv-mount", "--foreground", - "--allow-other", "--read-write", + "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","), fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())} + if runner.executor.Runtime() == "docker" { + arvMountCmd = append(arvMountCmd, "--allow-other") + } + if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) } collectionPaths := []string{} - runner.Binds = nil - runner.Volumes = make(map[string]struct{}) needCertMount := true type copyFile struct { src string @@ -460,11 +440,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } for bind := range runner.SecretMounts { if _, ok := runner.Container.Mounts[bind]; ok { - return fmt.Errorf("secret mount %q conflicts with regular mount", bind) + return nil, fmt.Errorf("secret mount %q conflicts with regular mount", bind) } if runner.SecretMounts[bind].Kind != "json" && runner.SecretMounts[bind].Kind != "text" { - return fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted", + return nil, fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted", bind, runner.SecretMounts[bind].Kind) } binds = append(binds, bind) @@ -479,7 +459,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if bind == "stdout" || bind == "stderr" { // Is it a "file" mount kind? if mnt.Kind != "file" { - return fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind) + return nil, fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind) } // Does path start with OutputPath? @@ -488,14 +468,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) { prefix += "/" } if !strings.HasPrefix(mnt.Path, prefix) { - return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix) + return nil, fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix) } } if bind == "stdin" { // Is it a "collection" mount kind? if mnt.Kind != "collection" && mnt.Kind != "json" { - return fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind) + return nil, fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind) } } @@ -505,7 +485,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" { if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" { - return fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind) + return nil, fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind) } } @@ -513,17 +493,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) { case mnt.Kind == "collection" && bind != "stdin": var src string if mnt.UUID != "" && mnt.PortableDataHash != "" { - return fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") + return nil, fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") } if mnt.UUID != "" { if mnt.Writable { - return fmt.Errorf("writing to existing collections currently not permitted") + return nil, fmt.Errorf("writing to existing collections currently not permitted") } pdhOnly = false src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID) } else if mnt.PortableDataHash != "" { if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") { - return fmt.Errorf("can never write to a collection specified by portable data hash") + return nil, fmt.Errorf("can never write to a collection specified by portable data hash") } idx := strings.Index(mnt.PortableDataHash, "/") if idx > 0 { @@ -549,14 +529,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if mnt.Writable { if bind == runner.Container.OutputPath { runner.HostOutputDir = src - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind)) + bindmounts[bind] = bindmount{HostPath: src} } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") { copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]}) } else { - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind)) + bindmounts[bind] = bindmount{HostPath: src} } } else { - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind)) + bindmounts[bind] = bindmount{HostPath: src, ReadOnly: true} } collectionPaths = append(collectionPaths, src) @@ -564,17 +544,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) { var tmpdir string tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp") if err != nil { - return fmt.Errorf("while creating mount temp dir: %v", err) + return nil, fmt.Errorf("while creating mount temp dir: %v", err) } st, staterr := os.Stat(tmpdir) if staterr != nil { - return fmt.Errorf("while Stat on temp dir: %v", staterr) + return nil, fmt.Errorf("while Stat on temp dir: %v", staterr) } err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777) if staterr != nil { - return fmt.Errorf("while Chmod temp dir: %v", err) + return nil, fmt.Errorf("while Chmod temp dir: %v", err) } - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind)) + bindmounts[bind] = bindmount{HostPath: tmpdir} if bind == runner.Container.OutputPath { runner.HostOutputDir = tmpdir } @@ -584,53 +564,53 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if mnt.Kind == "json" { filedata, err = json.Marshal(mnt.Content) if err != nil { - return fmt.Errorf("encoding json data: %v", err) + return nil, fmt.Errorf("encoding json data: %v", err) } } else { text, ok := mnt.Content.(string) if !ok { - return fmt.Errorf("content for mount %q must be a string", bind) + return nil, fmt.Errorf("content for mount %q must be a string", bind) } filedata = []byte(text) } tmpdir, err := runner.MkTempDir(runner.parentTemp, mnt.Kind) if err != nil { - return fmt.Errorf("creating temp dir: %v", err) + return nil, fmt.Errorf("creating temp dir: %v", err) } tmpfn := filepath.Join(tmpdir, "mountdata."+mnt.Kind) err = ioutil.WriteFile(tmpfn, filedata, 0444) if err != nil { - return fmt.Errorf("writing temp file: %v", err) + return nil, fmt.Errorf("writing temp file: %v", err) } if strings.HasPrefix(bind, runner.Container.OutputPath+"/") { copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]}) } else { - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind)) + bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true} } case mnt.Kind == "git_tree": tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree") if err != nil { - return fmt.Errorf("creating temp dir: %v", err) + return nil, fmt.Errorf("creating temp dir: %v", err) } err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token) if err != nil { - return err + return nil, err } - runner.Binds = append(runner.Binds, tmpdir+":"+bind+":ro") + bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true} } } if runner.HostOutputDir == "" { - return fmt.Errorf("output path does not correspond to a writable mount point") + return nil, fmt.Errorf("output path does not correspond to a writable mount point") } - if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI { + if needCertMount && runner.Container.RuntimeConstraints.API { for _, certfile := range arvadosclient.CertFiles { _, err := os.Stat(certfile) if err == nil { - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile)) + bindmounts["/etc/arvados/ca-certificates.crt"] = bindmount{HostPath: certfile, ReadOnly: true} break } } @@ -641,24 +621,25 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } else { arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id") } + arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid") arvMountCmd = append(arvMountCmd, runner.ArvMountPoint) runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token) if err != nil { - return fmt.Errorf("while trying to start arv-mount: %v", err) + return nil, fmt.Errorf("while trying to start arv-mount: %v", err) } for _, p := range collectionPaths { _, err = os.Stat(p) if err != nil { - return fmt.Errorf("while checking that input files exist: %v", err) + return nil, fmt.Errorf("while checking that input files exist: %v", err) } } for _, cp := range copyFiles { st, err := os.Stat(cp.src) if err != nil { - return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) + return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) } if st.IsDir() { err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error { @@ -689,59 +670,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } } if err != nil { - return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) + return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err) } } - return nil -} - -func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) { - // Handle docker log protocol - // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container - defer close(runner.loggingDone) - - header := make([]byte, 8) - var err error - for err == nil { - _, err = io.ReadAtLeast(containerReader, header, 8) - if err != nil { - if err == io.EOF { - err = nil - } - break - } - readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24) - if header[0] == 1 { - // stdout - _, err = io.CopyN(runner.Stdout, containerReader, readsize) - } else { - // stderr - _, err = io.CopyN(runner.Stderr, containerReader, readsize) - } - } - - if err != nil { - runner.CrunchLog.Printf("error reading docker logs: %v", err) - } - - err = runner.Stdout.Close() - if err != nil { - runner.CrunchLog.Printf("error closing stdout logs: %v", err) - } - - err = runner.Stderr.Close() - if err != nil { - runner.CrunchLog.Printf("error closing stderr logs: %v", err) - } - - if runner.statReporter != nil { - runner.statReporter.Stop() - err = runner.statLogger.Close() - if err != nil { - runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) - } - } + return bindmounts, nil } func (runner *ContainerRunner) stopHoststat() error { @@ -778,7 +711,7 @@ func (runner *ContainerRunner) startCrunchstat() error { } runner.statLogger = NewThrottledLogger(w) runner.statReporter = &crunchstat.Reporter{ - CID: runner.ContainerID, + CID: runner.executor.CgroupID(), Logger: log.New(runner.statLogger, "", 0), CgroupParent: runner.expectCgroupParent, CgroupRoot: runner.cgroupRoot, @@ -941,102 +874,6 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str return true, nil } -// AttachStreams connects the docker container stdin, stdout and stderr logs -// to the Arvados logger which logs to Keep and the API server logs table. -func (runner *ContainerRunner) AttachStreams() (err error) { - - runner.CrunchLog.Print("Attaching container streams") - - // If stdin mount is provided, attach it to the docker container - var stdinRdr arvados.File - var stdinJSON []byte - if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok { - if stdinMnt.Kind == "collection" { - var stdinColl arvados.Collection - collID := stdinMnt.UUID - if collID == "" { - collID = stdinMnt.PortableDataHash - } - err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl) - if err != nil { - return fmt.Errorf("While getting stdin collection: %v", err) - } - - stdinRdr, err = runner.ContainerKeepClient.ManifestFileReader( - manifest.Manifest{Text: stdinColl.ManifestText}, - stdinMnt.Path) - if os.IsNotExist(err) { - return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path) - } else if err != nil { - return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err) - } - } else if stdinMnt.Kind == "json" { - stdinJSON, err = json.Marshal(stdinMnt.Content) - if err != nil { - return fmt.Errorf("While encoding stdin json data: %v", err) - } - } - } - - stdinUsed := stdinRdr != nil || len(stdinJSON) != 0 - response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID, - dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true}) - if err != nil { - return fmt.Errorf("While attaching container stdout/stderr streams: %v", err) - } - - runner.loggingDone = make(chan bool) - - if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok { - stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path) - if err != nil { - return err - } - runner.Stdout = stdoutFile - } else if w, err := runner.NewLogWriter("stdout"); err != nil { - return err - } else { - runner.Stdout = NewThrottledLogger(w) - } - - if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok { - stderrFile, err := runner.getStdoutFile(stderrMnt.Path) - if err != nil { - return err - } - runner.Stderr = stderrFile - } else if w, err := runner.NewLogWriter("stderr"); err != nil { - return err - } else { - runner.Stderr = NewThrottledLogger(w) - } - - if stdinRdr != nil { - go func() { - _, err := io.Copy(response.Conn, stdinRdr) - if err != nil { - runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err) - runner.stop(nil) - } - stdinRdr.Close() - response.CloseWrite() - }() - } else if len(stdinJSON) != 0 { - go func() { - _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON)) - if err != nil { - runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err) - runner.stop(nil) - } - response.CloseWrite() - }() - } - - go runner.ProcessDockerAttach(response.Reader) - - return nil -} - func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { stdoutPath := mntPath[len(runner.Container.OutputPath):] index := strings.LastIndex(stdoutPath, "/") @@ -1063,86 +900,113 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { } // CreateContainer creates the docker container. -func (runner *ContainerRunner) CreateContainer() error { - runner.CrunchLog.Print("Creating Docker container") - - runner.ContainerConfig.Cmd = runner.Container.Command - if runner.Container.Cwd != "." { - runner.ContainerConfig.WorkingDir = runner.Container.Cwd - } - - for k, v := range runner.Container.Environment { - runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v) - } - - runner.ContainerConfig.Volumes = runner.Volumes - - maxRAM := int64(runner.Container.RuntimeConstraints.RAM) - minDockerRAM := int64(16) - if maxRAM < minDockerRAM*1024*1024 { - // Docker daemon won't let you set a limit less than ~10 MiB - maxRAM = minDockerRAM * 1024 * 1024 - } - runner.HostConfig = dockercontainer.HostConfig{ - Binds: runner.Binds, - LogConfig: dockercontainer.LogConfig{ - Type: "none", - }, - Resources: dockercontainer.Resources{ - CgroupParent: runner.setCgroupParent, - NanoCPUs: int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000, - Memory: maxRAM, // RAM - MemorySwap: maxRAM, // RAM+swap - KernelMemory: maxRAM, // kernel portion - }, +func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error { + var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) + if mnt, ok := runner.Container.Mounts["stdin"]; ok { + switch mnt.Kind { + case "collection": + var collID string + if mnt.UUID != "" { + collID = mnt.UUID + } else { + collID = mnt.PortableDataHash + } + path := runner.ArvMountPoint + "/by_id/" + collID + "/" + mnt.Path + f, err := os.Open(path) + if err != nil { + return err + } + stdin = f + case "json": + j, err := json.Marshal(mnt.Content) + if err != nil { + return fmt.Errorf("error encoding stdin json data: %v", err) + } + stdin = ioutil.NopCloser(bytes.NewReader(j)) + default: + return fmt.Errorf("stdin mount has unsupported kind %q", mnt.Kind) + } } - if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { - tok, err := runner.ContainerToken() + var stdout, stderr io.WriteCloser + if mnt, ok := runner.Container.Mounts["stdout"]; ok { + f, err := runner.getStdoutFile(mnt.Path) if err != nil { return err } - runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, - "ARVADOS_API_TOKEN="+tok, - "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"), - "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"), - ) - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode) + stdout = f + } else if w, err := runner.NewLogWriter("stdout"); err != nil { + return err } else { - if runner.enableNetwork == "always" { - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode) - } else { - runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none") - } + stdout = NewThrottledLogger(w) } - _, stdinUsed := runner.Container.Mounts["stdin"] - runner.ContainerConfig.OpenStdin = stdinUsed - runner.ContainerConfig.StdinOnce = stdinUsed - runner.ContainerConfig.AttachStdin = stdinUsed - runner.ContainerConfig.AttachStdout = true - runner.ContainerConfig.AttachStderr = true - - createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID) - if err != nil { - return fmt.Errorf("While creating container: %v", err) + if mnt, ok := runner.Container.Mounts["stderr"]; ok { + f, err := runner.getStdoutFile(mnt.Path) + if err != nil { + return err + } + stderr = f + } else if w, err := runner.NewLogWriter("stderr"); err != nil { + return err + } else { + stderr = NewThrottledLogger(w) } - runner.ContainerID = createdBody.ID - - return runner.AttachStreams() + env := runner.Container.Environment + enableNetwork := runner.enableNetwork == "always" + if runner.Container.RuntimeConstraints.API { + enableNetwork = true + tok, err := runner.ContainerToken() + if err != nil { + return err + } + env = map[string]string{} + for k, v := range runner.Container.Environment { + env[k] = v + } + env["ARVADOS_API_TOKEN"] = tok + env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST") + env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE") + } + workdir := runner.Container.Cwd + if workdir == "." { + // both "" and "." mean default + workdir = "" + } + ram := runner.Container.RuntimeConstraints.RAM + if !runner.enableMemoryLimit { + ram = 0 + } + runner.executorStdin = stdin + runner.executorStdout = stdout + runner.executorStderr = stderr + return runner.executor.Create(containerSpec{ + Image: imageID, + VCPUs: runner.Container.RuntimeConstraints.VCPUs, + RAM: ram, + WorkingDir: workdir, + Env: env, + BindMounts: bindmounts, + Command: runner.Container.Command, + EnableNetwork: enableNetwork, + NetworkMode: runner.networkMode, + CgroupParent: runner.setCgroupParent, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + }) } // StartContainer starts the docker container created by CreateContainer. func (runner *ContainerRunner) StartContainer() error { - runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID) + runner.CrunchLog.Printf("Starting container") runner.cStateLock.Lock() defer runner.cStateLock.Unlock() if runner.cCancelled { return ErrCancelled } - err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID, - dockertypes.ContainerStartOptions{}) + err := runner.executor.Start() if err != nil { var advice string if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil { @@ -1156,71 +1020,60 @@ func (runner *ContainerRunner) StartContainer() error { // WaitFinish waits for the container to terminate, capture the exit code, and // close the stdout/stderr logging. func (runner *ContainerRunner) WaitFinish() error { - var runTimeExceeded <-chan time.Time runner.CrunchLog.Print("Waiting for container to finish") - - waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning) - arvMountExit := runner.ArvMountExit - if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 { - runTimeExceeded = time.After(time.Duration(timeout) * time.Second) + var timeout <-chan time.Time + if s := runner.Container.SchedulingParameters.MaxRunTime; s > 0 { + timeout = time.After(time.Duration(s) * time.Second) } - - containerGone := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { - defer close(containerGone) - if runner.containerWatchdogInterval < 1 { - runner.containerWatchdogInterval = time.Minute - } - for range time.NewTicker(runner.containerWatchdogInterval).C { - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval)) - ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID) - cancel() - runner.cStateLock.Lock() - done := runner.cRemoved || runner.ExitCode != nil - runner.cStateLock.Unlock() - if done { - return - } else if err != nil { - runner.CrunchLog.Printf("Error inspecting container: %s", err) - runner.checkBrokenNode(err) - return - } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") { - runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State) - return - } - } - }() - - for { select { - case waitBody := <-waitOk: - runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode) - code := int(waitBody.StatusCode) - runner.ExitCode = &code - - // wait for stdout/stderr to complete - <-runner.loggingDone - return nil - - case err := <-waitErr: - return fmt.Errorf("container wait: %v", err) - - case <-arvMountExit: - runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.") - runner.stop(nil) - // arvMountExit will always be ready now that - // it's closed, but that doesn't interest us. - arvMountExit = nil - - case <-runTimeExceeded: + case <-timeout: runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.") runner.stop(nil) - runTimeExceeded = nil + case <-runner.ArvMountExit: + runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.") + runner.stop(nil) + case <-ctx.Done(): + } + }() + exitcode, err := runner.executor.Wait(ctx) + if err != nil { + runner.checkBrokenNode(err) + return err + } + runner.ExitCode = &exitcode + + var returnErr error + if err = runner.executorStdin.Close(); err != nil { + err = fmt.Errorf("error closing container stdin: %s", err) + runner.CrunchLog.Printf("%s", err) + returnErr = err + } + if err = runner.executorStdout.Close(); err != nil { + err = fmt.Errorf("error closing container stdout: %s", err) + runner.CrunchLog.Printf("%s", err) + if returnErr == nil { + returnErr = err + } + } + if err = runner.executorStderr.Close(); err != nil { + err = fmt.Errorf("error closing container stderr: %s", err) + runner.CrunchLog.Printf("%s", err) + if returnErr == nil { + returnErr = err + } + } - case <-containerGone: - return errors.New("docker client never returned status") + if runner.statReporter != nil { + runner.statReporter.Stop() + err = runner.statLogger.Close() + if err != nil { + runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) } } + return returnErr } func (runner *ContainerRunner) updateLogs() { @@ -1271,10 +1124,25 @@ func (runner *ContainerRunner) updateLogs() { } } +func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) { + var updated arvados.Container + err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "container": arvadosclient.Dict{ + "runtime_status": arvadosclient.Dict{ + "warning": "arv-mount: " + pattern, + "warningDetail": text, + }, + }, + }, &updated) + if err != nil { + runner.CrunchLog.Printf("error updating container runtime_status: %s", err) + } +} + // CaptureOutput saves data from the container's output directory if // needed, and updates the container output accordingly. -func (runner *ContainerRunner) CaptureOutput() error { - if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { +func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error { + if runner.Container.RuntimeConstraints.API { // Output may have been set directly by the container, so // refresh the container record to check. err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID, @@ -1295,7 +1163,7 @@ func (runner *ContainerRunner) CaptureOutput() error { keepClient: runner.ContainerKeepClient, hostOutputDir: runner.HostOutputDir, ctrOutputDir: runner.Container.OutputPath, - binds: runner.Binds, + bindmounts: bindmounts, mounts: runner.Container.Mounts, secretMounts: runner.SecretMounts, logger: runner.CrunchLog, @@ -1341,6 +1209,7 @@ func (runner *ContainerRunner) CleanupDirs() { if umnterr != nil { runner.CrunchLog.Printf("Error unmounting: %v", umnterr) + runner.ArvMount.Process.Kill() } else { // If arv-mount --unmount gets stuck for any reason, we // don't want to wait for it forever. Do Wait() in a goroutine @@ -1371,12 +1240,14 @@ func (runner *ContainerRunner) CleanupDirs() { } } } + runner.ArvMount = nil } if runner.ArvMountPoint != "" { if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil { runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr) } + runner.ArvMountPoint = "" } if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil { @@ -1411,6 +1282,16 @@ func (runner *ContainerRunner) CommitLogs() error { runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) }() + if runner.keepstoreLogger != nil { + // Flush any buffered logs from our local keepstore + // process. Discard anything logged after this point + // -- it won't end up in the log collection, so + // there's no point writing it to the collectionfs. + runner.keepstoreLogbuf.SetWriter(io.Discard) + runner.keepstoreLogger.Close() + runner.keepstoreLogger = nil + } + if runner.LogsPDH != nil { // If we have already assigned something to LogsPDH, // we must be closing the re-opened log, which won't @@ -1419,6 +1300,7 @@ func (runner *ContainerRunner) CommitLogs() error { // -- it exists only to send logs to other channels. return nil } + saved, err := runner.saveLogCollection(true) if err != nil { return fmt.Errorf("error saving log collection: %s", err) @@ -1436,15 +1318,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C // Already finalized. return } - mt, err := runner.LogCollection.MarshalManifest(".") - if err != nil { - err = fmt.Errorf("error creating log manifest: %v", err) - return - } updates := arvadosclient.Dict{ - "name": "logs for " + runner.Container.UUID, - "manifest_text": mt, + "name": "logs for " + runner.Container.UUID, } + mt, err1 := runner.LogCollection.MarshalManifest(".") + if err1 == nil { + // Only send updated manifest text if there was no + // error. + updates["manifest_text"] = mt + } + + // Even if flushing the manifest had an error, we still want + // to update the log record, if possible, to push the trash_at + // and delete_at times into the future. Details on bug + // #17293. if final { updates["is_trashed"] = true } else { @@ -1453,16 +1340,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C updates["delete_at"] = exp } reqBody := arvadosclient.Dict{"collection": updates} + var err2 error if runner.logUUID == "" { reqBody["ensure_unique_name"] = true - err = runner.DispatcherArvClient.Create("collections", reqBody, &response) + err2 = runner.DispatcherArvClient.Create("collections", reqBody, &response) } else { - err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response) + err2 = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response) } - if err != nil { - return + if err2 == nil { + runner.logUUID = response.UUID + } + + if err1 != nil || err2 != nil { + err = fmt.Errorf("error recording logs: %q, %q", err1, err2) } - runner.logUUID = response.UUID return } @@ -1474,7 +1365,7 @@ func (runner *ContainerRunner) UpdateContainerRunning() error { return ErrCancelled } return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, - arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gatewayAddress}}, nil) + arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil) } // ContainerToken returns the api_token the container (and any @@ -1536,7 +1427,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err // Run the full container lifecycle. func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String()) - runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID) + runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime()) hostname, hosterr := os.Hostname() if hosterr != nil { @@ -1562,6 +1453,7 @@ func (runner *ContainerRunner) Run() (err error) { return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State) } + var bindmounts map[string]bindmount defer func() { // checkErr prints e (unless it's nil) and sets err to // e (unless err is already non-nil). Thus, if err @@ -1596,9 +1488,12 @@ func (runner *ContainerRunner) Run() (err error) { // capture partial output and write logs } - checkErr("CaptureOutput", runner.CaptureOutput()) + if bindmounts != nil { + checkErr("CaptureOutput", runner.CaptureOutput(bindmounts)) + } checkErr("stopHoststat", runner.stopHoststat()) checkErr("CommitLogs", runner.CommitLogs()) + runner.CleanupDirs() checkErr("UpdateContainerFinal", runner.UpdateContainerFinal()) }() @@ -1608,8 +1503,16 @@ func (runner *ContainerRunner) Run() (err error) { return } + // set up FUSE mount and binds + bindmounts, err = runner.SetupMounts() + if err != nil { + runner.finalState = "Cancelled" + err = fmt.Errorf("While setting up mounts: %v", err) + return + } + // check for and/or load image - err = runner.LoadImage() + imageID, err := runner.LoadImage() if err != nil { if !runner.checkBrokenNode(err) { // Failed to load image but not due to a "broken node" @@ -1620,15 +1523,7 @@ func (runner *ContainerRunner) Run() (err error) { return } - // set up FUSE mount and binds - err = runner.SetupMounts() - if err != nil { - runner.finalState = "Cancelled" - err = fmt.Errorf("While setting up mounts: %v", err) - return - } - - err = runner.CreateContainer() + err = runner.CreateContainer(imageID, bindmounts) if err != nil { return } @@ -1704,6 +1599,9 @@ func (runner *ContainerRunner) fetchContainerRecord() error { return fmt.Errorf("error creating container API client: %v", err) } + runner.ContainerKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses) + runner.DispatcherKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses) + err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm) if err != nil { if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 { @@ -1721,14 +1619,12 @@ func (runner *ContainerRunner) fetchContainerRecord() error { func NewContainerRunner(dispatcherClient *arvados.Client, dispatcherArvClient IArvadosClient, dispatcherKeepClient IKeepClient, - docker ThinDockerClient, containerUUID string) (*ContainerRunner, error) { cr := &ContainerRunner{ dispatcherClient: dispatcherClient, DispatcherArvClient: dispatcherArvClient, DispatcherKeepClient: dispatcherKeepClient, - Docker: docker, } cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd @@ -1767,6 +1663,7 @@ func NewContainerRunner(dispatcherClient *arvados.Client, } func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + log := log.New(stderr, "", 0) flags := flag.NewFlagSet(prog, flag.ContinueOnError) statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting") cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree") @@ -1774,19 +1671,15 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates") detach := flags.Bool("detach", false, "Detach from parent process and run in the background") - stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin") + stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin") sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes") - enableNetwork := flags.String("container-enable-networking", "default", - `Specify if networking should be enabled for container. One of 'default', 'always': - default: only enable networking if container requests it. - always: containers always have networking enabled - `) - networkMode := flags.String("container-network-mode", "default", - `Set networking mode for container. Corresponds to Docker network mode (--net). - `) + enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage") + enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)") + networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`) memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container") + runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity") flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") ignoreDetachFlag := false @@ -1806,35 +1699,51 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } else if err != nil { log.Print(err) return 1 + } else if flags.NArg() != 1 { + fmt.Fprintf(flags.Output(), "Usage: %s [options] containerUUID\n\nOptions:\n", prog) + flags.PrintDefaults() + return 2 } - if *stdinEnv && !ignoreDetachFlag { - // Load env vars on stdin if asked (but not in a - // detached child process, in which case stdin is - // /dev/null). - err := loadEnv(os.Stdin) - if err != nil { - log.Print(err) - return 1 - } - } - - containerID := flags.Arg(0) + containerUUID := flags.Arg(0) switch { case *detach && !ignoreDetachFlag: - return Detach(containerID, prog, args, os.Stdout, os.Stderr) + return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr) case *kill >= 0: - return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr) + return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr) case *list: return ListProcesses(os.Stdout, os.Stderr) } - if containerID == "" { + if len(containerUUID) != 27 { log.Printf("usage: %s [options] UUID", prog) return 1 } + var conf ConfigData + if *stdinConfig { + err := json.NewDecoder(stdin).Decode(&conf) + if err != nil { + log.Printf("decode stdin: %s", err) + return 1 + } + for k, v := range conf.Env { + err = os.Setenv(k, v) + if err != nil { + log.Printf("setenv(%q): %s", k, err) + return 1 + } + } + if conf.Cluster != nil { + // ClusterID is missing from the JSON + // representation, but we need it to generate + // a valid config file for keepstore, so we + // fill it using the container UUID prefix. + conf.Cluster.ClusterID = containerUUID[:5] + } + } + log.Printf("crunch-run %s started", cmd.Version.String()) time.Sleep(*sleep) @@ -1842,48 +1751,121 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s arvadosclient.CertFiles = []string{*caCertsPath} } + var keepstoreLogbuf bufThenWrite + keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr)) + if err != nil { + log.Print(err) + return 1 + } + if keepstore != nil { + defer keepstore.Process.Kill() + } + api, err := arvadosclient.MakeArvadosClient() if err != nil { - log.Printf("%s: %v", containerID, err) + log.Printf("%s: %v", containerUUID, err) return 1 } api.Retries = 8 - kc, kcerr := keepclient.MakeKeepClient(api) - if kcerr != nil { - log.Printf("%s: %v", containerID, kcerr) + kc, err := keepclient.MakeKeepClient(api) + if err != nil { + log.Printf("%s: %v", containerUUID, err) return 1 } kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} kc.Retries = 4 - // API version 1.21 corresponds to Docker 1.9, which is currently the - // minimum version we want to support. - docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) - - cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID) + cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID) if err != nil { log.Print(err) return 1 } - if dockererr != nil { - cr.CrunchLog.Printf("%s: %v", containerID, dockererr) - cr.checkBrokenNode(dockererr) + + if keepstore == nil { + // Log explanation (if any) for why we're not running + // a local keepstore. + var buf bytes.Buffer + keepstoreLogbuf.SetWriter(&buf) + if buf.Len() > 0 { + cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String())) + } + } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" { + cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES")) + keepstoreLogbuf.SetWriter(io.Discard) + } else { + cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES")) + logwriter, err := cr.NewLogWriter("keepstore") + if err != nil { + log.Print(err) + return 1 + } + cr.keepstoreLogger = NewThrottledLogger(logwriter) + + var writer io.WriteCloser = cr.keepstoreLogger + if logWhat == "errors" { + writer = &filterKeepstoreErrorsOnly{WriteCloser: writer} + } else if logWhat != "all" { + // should have been caught earlier by + // dispatcher's config loader + log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat) + return 1 + } + err = keepstoreLogbuf.SetWriter(writer) + if err != nil { + log.Print(err) + return 1 + } + cr.keepstoreLogbuf = &keepstoreLogbuf + } + + switch *runtimeEngine { + case "docker": + cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval) + case "singularity": + cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf) + default: + cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine) + cr.CrunchLog.Close() + return 1 + } + if err != nil { + cr.CrunchLog.Printf("%s: %v", containerUUID, err) + cr.checkBrokenNode(err) cr.CrunchLog.Close() return 1 } + defer cr.executor.Close() - cr.gatewayAuthSecret = os.Getenv("GatewayAuthSecret") + gwAuthSecret := os.Getenv("GatewayAuthSecret") os.Unsetenv("GatewayAuthSecret") - err = cr.startGatewayServer() - if err != nil { - log.Printf("error starting gateway server: %s", err) - return 1 + if gwAuthSecret == "" { + // not safe to run a gateway service without an auth + // secret + cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)") + } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" { + // dispatcher did not tell us which external IP + // address to advertise --> no gateway service + cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)") + } else if de, ok := cr.executor.(*dockerExecutor); ok { + cr.gateway = Gateway{ + Address: gwListen, + AuthSecret: gwAuthSecret, + ContainerUUID: containerUUID, + DockerContainerID: &de.containerID, + Log: cr.CrunchLog, + ContainerIPAddress: dockerContainerIPAddress(&de.containerID), + } + err = cr.gateway.Start() + if err != nil { + log.Printf("error starting gateway server: %s", err) + return 1 + } } - parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".") + parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerUUID+".") if tmperr != nil { - log.Printf("%s: %v", containerID, tmperr) + log.Printf("%s: %v", containerUUID, tmperr) return 1 } @@ -1891,6 +1873,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.statInterval = *statInterval cr.cgroupRoot = *cgroupRoot cr.expectCgroupParent = *cgroupParent + cr.enableMemoryLimit = *enableMemoryLimit cr.enableNetwork = *enableNetwork cr.networkMode = *networkMode if *cgroupParentSubsystem != "" { @@ -1917,27 +1900,104 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } if runerr != nil { - log.Printf("%s: %v", containerID, runerr) + log.Printf("%s: %v", containerUUID, runerr) return 1 } return 0 } -func loadEnv(rdr io.Reader) error { - buf, err := ioutil.ReadAll(rdr) +func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) { + if configData.Cluster == nil || configData.KeepBuffers < 1 { + return nil, nil + } + for uuid, vol := range configData.Cluster.Volumes { + if len(vol.AccessViaHosts) > 0 { + fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid) + return nil, nil + } + if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication { + fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication) + return nil, nil + } + } + + // Rather than have an alternate way to tell keepstore how + // many buffers to use when starting it this way, we just + // modify the cluster configuration that we feed it on stdin. + configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers + + ln, err := net.Listen("tcp", "localhost:0") if err != nil { - return fmt.Errorf("read stdin: %s", err) + return nil, err } - var env map[string]string - err = json.Unmarshal(buf, &env) + _, port, err := net.SplitHostPort(ln.Addr().String()) if err != nil { - return fmt.Errorf("decode stdin: %s", err) + ln.Close() + return nil, err } - for k, v := range env { - err = os.Setenv(k, v) + ln.Close() + url := "http://localhost:" + port + + fmt.Fprintf(logbuf, "starting keepstore on %s\n", url) + + var confJSON bytes.Buffer + err = json.NewEncoder(&confJSON).Encode(arvados.Config{ + Clusters: map[string]arvados.Cluster{ + configData.Cluster.ClusterID: *configData.Cluster, + }, + }) + if err != nil { + return nil, err + } + cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-") + if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") { + // If we're a 'go test' process, running + // /proc/self/exe would start the test suite in a + // child process, which is not what we want. + cmd.Path, _ = exec.LookPath("go") + cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...) + cmd.Env = os.Environ() + } + cmd.Stdin = &confJSON + cmd.Stdout = logbuf + cmd.Stderr = logbuf + cmd.Env = append(cmd.Env, + "GOGC=10", + "ARVADOS_SERVICE_INTERNAL_URL="+url) + err = cmd.Start() + if err != nil { + return nil, fmt.Errorf("error starting keepstore process: %w", err) + } + cmdExited := false + go func() { + cmd.Wait() + cmdExited = true + }() + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10)) + defer cancel() + poll := time.NewTicker(time.Second / 10) + defer poll.Stop() + client := http.Client{} + for range poll.C { + testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil) + testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken) if err != nil { - return fmt.Errorf("setenv(%q): %s", k, err) + return nil, err + } + resp, err := client.Do(testReq) + if err == nil { + resp.Body.Close() + if resp.StatusCode == http.StatusOK { + break + } + } + if cmdExited { + return nil, fmt.Errorf("keepstore child process exited") + } + if ctx.Err() != nil { + return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy") } } - return nil + os.Setenv("ARVADOS_KEEP_SERVICES", url) + return cmd, nil }