X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/777e716728a7da63ece00df7e5bb8be7f9a2a1a3..6a544620c4d5acb0c42cd56346f74454637904cb:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index a05f61a858..27bfa88730 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -1,6 +1,11 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( + "bytes" "context" "encoding/json" "errors" @@ -48,7 +53,7 @@ var ErrCancelled = errors.New("Cancelled") // IKeepClient is the minimal Keep API methods used by crunch-run. type IKeepClient interface { PutHB(hash string, buf []byte) (string, int, error) - ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) + ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) } // NewLogWriter is a factory function to create a new log writer. @@ -65,7 +70,7 @@ type ThinDockerClient interface { networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error ContainerStop(ctx context.Context, container string, timeout *time.Duration) error - ContainerWait(ctx context.Context, container string) (int64, error) + ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) @@ -99,8 +104,8 @@ func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container } // ContainerWait invokes dockerclient.Client.ContainerWait -func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) { - return proxy.Docker.ContainerWait(ctx, container) +func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) { + return proxy.Docker.ContainerWait(ctx, container, condition) } // ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw @@ -134,7 +139,7 @@ type ContainerRunner struct { loggingDone chan bool CrunchLog *ThrottledLogger Stdout io.WriteCloser - Stderr *ThrottledLogger + Stderr io.WriteCloser LogCollection *CollectionWriter LogsPDH *string RunArvMount @@ -144,6 +149,7 @@ type ContainerRunner struct { HostOutputDir string CleanupTempDir []string Binds []string + Volumes map[string]struct{} OutputPDH *string SigChan chan os.Signal ArvMountExit chan error @@ -335,20 +341,21 @@ func (runner *ContainerRunner) SetupMounts() (err error) { collectionPaths := []string{} runner.Binds = nil + runner.Volumes = make(map[string]struct{}) needCertMount := true var binds []string - for bind, _ := range runner.Container.Mounts { + for bind := range runner.Container.Mounts { binds = append(binds, bind) } sort.Strings(binds) for _, bind := range binds { mnt := runner.Container.Mounts[bind] - if bind == "stdout" { + if bind == "stdout" || bind == "stderr" { // Is it a "file" mount kind? if mnt.Kind != "file" { - return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind) + return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind) } // Does path start with OutputPath? @@ -357,7 +364,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) { prefix += "/" } if !strings.HasPrefix(mnt.Path, prefix) { - return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix) + return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix) + } + } + + if bind == "stdin" { + // Is it a "collection" mount kind? + if mnt.Kind != "collection" && mnt.Kind != "json" { + return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind) } } @@ -372,7 +386,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } switch { - case mnt.Kind == "collection": + case mnt.Kind == "collection" && bind != "stdin": var src string if mnt.UUID != "" && mnt.PortableDataHash != "" { return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount") @@ -420,24 +434,25 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } collectionPaths = append(collectionPaths, src) - case mnt.Kind == "tmp" && bind == runner.Container.OutputPath: - runner.HostOutputDir, err = runner.MkTempDir("", "") + case mnt.Kind == "tmp": + var tmpdir string + tmpdir, err = runner.MkTempDir("", "") if err != nil { return fmt.Errorf("While creating mount temp dir: %v", err) } - st, staterr := os.Stat(runner.HostOutputDir) + st, staterr := os.Stat(tmpdir) if staterr != nil { return fmt.Errorf("While Stat on temp dir: %v", staterr) } - err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777) + err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777) if staterr != nil { return fmt.Errorf("While Chmod temp dir: %v", err) } - runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir) - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind)) - - case mnt.Kind == "tmp": - runner.Binds = append(runner.Binds, bind) + runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir) + runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind)) + if bind == runner.Container.OutputPath { + runner.HostOutputDir = tmpdir + } case mnt.Kind == "json": jsondata, err := json.Marshal(mnt.Content) @@ -577,23 +592,23 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) { logger := log.New(w, "node-info", 0) commands := []infoCommand{ - infoCommand{ + { label: "Host Information", cmd: []string{"uname", "-a"}, }, - infoCommand{ + { label: "CPU Information", cmd: []string{"cat", "/proc/cpuinfo"}, }, - infoCommand{ + { label: "Memory Information", cmd: []string{"cat", "/proc/meminfo"}, }, - infoCommand{ + { label: "Disk Space", cmd: []string{"df", "-m", "/", os.TempDir()}, }, - infoCommand{ + { label: "Disk INodes", cmd: []string{"df", "-i", "/", os.TempDir()}, }, @@ -623,25 +638,23 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) { // Get and save the raw JSON container record from the API server func (runner *ContainerRunner) LogContainerRecord() (err error) { w := &ArvLogWriter{ - runner.ArvClient, - runner.Container.UUID, - "container", - runner.LogCollection.Open("container.json"), + ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, + loggingStream: "container", + writeCloser: runner.LogCollection.Open("container.json"), } + // Get Container record JSON from the API Server reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil) if err != nil { return fmt.Errorf("While retrieving container record from the API server: %v", err) } defer reader.Close() - // Read the API server response as []byte - json_bytes, err := ioutil.ReadAll(reader) - if err != nil { - return fmt.Errorf("While reading container record API server response: %v", err) - } - // Decode the JSON []byte + + dec := json.NewDecoder(reader) + dec.UseNumber() var cr map[string]interface{} - if err = json.Unmarshal(json_bytes, &cr); err != nil { + if err = dec.Decode(&cr); err != nil { return fmt.Errorf("While decoding the container record JSON response: %v", err) } // Re-encode it using indentation to improve readability @@ -657,14 +670,44 @@ func (runner *ContainerRunner) LogContainerRecord() (err error) { return nil } -// AttachLogs connects the docker container stdout and stderr logs to the -// Arvados logger which logs to Keep and the API server logs table. +// AttachStreams connects the docker container stdin, stdout and stderr logs +// to the Arvados logger which logs to Keep and the API server logs table. func (runner *ContainerRunner) AttachStreams() (err error) { runner.CrunchLog.Print("Attaching container streams") + // If stdin mount is provided, attach it to the docker container + var stdinRdr arvados.File + var stdinJson []byte + if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok { + if stdinMnt.Kind == "collection" { + var stdinColl arvados.Collection + collId := stdinMnt.UUID + if collId == "" { + collId = stdinMnt.PortableDataHash + } + err = runner.ArvClient.Get("collections", collId, nil, &stdinColl) + if err != nil { + return fmt.Errorf("While getting stding collection: %v", err) + } + + stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path) + if os.IsNotExist(err) { + return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path) + } else if err != nil { + return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err) + } + } else if stdinMnt.Kind == "json" { + stdinJson, err = json.Marshal(stdinMnt.Content) + if err != nil { + return fmt.Errorf("While encoding stdin json data: %v", err) + } + } + } + + stdinUsed := stdinRdr != nil || len(stdinJson) != 0 response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID, - dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true}) + dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true}) if err != nil { return fmt.Errorf("While attaching container stdout/stderr streams: %v", err) } @@ -672,37 +715,76 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.loggingDone = make(chan bool) if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok { - stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):] - index := strings.LastIndex(stdoutPath, "/") - if index > 0 { - subdirs := stdoutPath[:index] - if subdirs != "" { - st, err := os.Stat(runner.HostOutputDir) - if err != nil { - return fmt.Errorf("While Stat on temp dir: %v", err) - } - stdoutPath := path.Join(runner.HostOutputDir, subdirs) - err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777) - if err != nil { - return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err) - } - } - } - stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath)) + stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path) if err != nil { - return fmt.Errorf("While creating stdout file: %v", err) + return err } runner.Stdout = stdoutFile } else { runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout")) } - runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr")) + + if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok { + stderrFile, err := runner.getStdoutFile(stderrMnt.Path) + if err != nil { + return err + } + runner.Stderr = stderrFile + } else { + runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr")) + } + + if stdinRdr != nil { + go func() { + _, err := io.Copy(response.Conn, stdinRdr) + if err != nil { + runner.CrunchLog.Print("While writing stdin collection to docker container %q", err) + runner.stop() + } + stdinRdr.Close() + response.CloseWrite() + }() + } else if len(stdinJson) != 0 { + go func() { + _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson)) + if err != nil { + runner.CrunchLog.Print("While writing stdin json to docker container %q", err) + runner.stop() + } + response.CloseWrite() + }() + } go runner.ProcessDockerAttach(response.Reader) return nil } +func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { + stdoutPath := mntPath[len(runner.Container.OutputPath):] + index := strings.LastIndex(stdoutPath, "/") + if index > 0 { + subdirs := stdoutPath[:index] + if subdirs != "" { + st, err := os.Stat(runner.HostOutputDir) + if err != nil { + return nil, fmt.Errorf("While Stat on temp dir: %v", err) + } + stdoutPath := filepath.Join(runner.HostOutputDir, subdirs) + err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777) + if err != nil { + return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err) + } + } + } + stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath)) + if err != nil { + return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err) + } + + return stdoutFile, nil +} + // CreateContainer creates the docker container. func (runner *ContainerRunner) CreateContainer() error { runner.CrunchLog.Print("Creating Docker container") @@ -716,12 +798,16 @@ func (runner *ContainerRunner) CreateContainer() error { runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v) } + runner.ContainerConfig.Volumes = runner.Volumes + runner.HostConfig = dockercontainer.HostConfig{ - Binds: runner.Binds, - Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent), + Binds: runner.Binds, LogConfig: dockercontainer.LogConfig{ Type: "none", }, + Resources: dockercontainer.Resources{ + CgroupParent: runner.setCgroupParent, + }, } if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { @@ -743,6 +829,13 @@ func (runner *ContainerRunner) CreateContainer() error { } } + _, stdinUsed := runner.Container.Mounts["stdin"] + runner.ContainerConfig.OpenStdin = stdinUsed + runner.ContainerConfig.StdinOnce = stdinUsed + runner.ContainerConfig.AttachStdin = stdinUsed + runner.ContainerConfig.AttachStdout = true + runner.ContainerConfig.AttachStderr = true + createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID) if err != nil { return fmt.Errorf("While creating container: %v", err) @@ -772,21 +865,28 @@ func (runner *ContainerRunner) StartContainer() error { // WaitFinish waits for the container to terminate, capture the exit code, and // close the stdout/stderr logging. -func (runner *ContainerRunner) WaitFinish() error { +func (runner *ContainerRunner) WaitFinish() (err error) { runner.CrunchLog.Print("Waiting for container to finish") - waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID) + waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running") + + var waitBody dockercontainer.ContainerWaitOKBody + select { + case waitBody = <-waitOk: + case err = <-waitErr: + } + if err != nil { return fmt.Errorf("container wait: %v", err) } - runner.CrunchLog.Printf("Container exited with code: %v", waitDocker) - code := int(waitDocker) + runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode) + code := int(waitBody.StatusCode) runner.ExitCode = &code waitMount := runner.ArvMountExit select { - case err := <-waitMount: + case err = <-waitMount: runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err) waitMount = nil runner.stop() @@ -829,14 +929,91 @@ func (runner *ContainerRunner) CaptureOutput() error { return fmt.Errorf("While checking host output path: %v", err) } + // Pre-populate output from the configured mount points + var binds []string + for bind, mnt := range runner.Container.Mounts { + if mnt.Kind == "collection" { + binds = append(binds, bind) + } + } + sort.Strings(binds) + var manifestText string collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir) _, err = os.Stat(collectionMetafile) if err != nil { // Regular directory + + // Find symlinks to arv-mounted files & dirs. + err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.Mode()&os.ModeSymlink == 0 { + return nil + } + // read link to get container internal path + // only support 1 level of symlinking here. + var tgt string + tgt, err = os.Readlink(path) + if err != nil { + return err + } + + // get path relative to output dir + outputSuffix := path[len(runner.HostOutputDir):] + + if strings.HasPrefix(tgt, "/") { + // go through mounts and try reverse map to collection reference + for _, bind := range binds { + mnt := runner.Container.Mounts[bind] + if tgt == bind || strings.HasPrefix(tgt, bind+"/") { + // get path relative to bind + targetSuffix := tgt[len(bind):] + + // Copy mount and adjust the path to add path relative to the bind + adjustedMount := mnt + adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix) + + // get manifest text + var m string + m, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix) + if err != nil { + return err + } + manifestText = manifestText + m + // delete symlink so WriteTree won't try to to dereference it. + os.Remove(path) + return nil + } + } + } + + // Not a link to a mount. Must be dereferencible and + // point into the output directory. + tgt, err = filepath.EvalSymlinks(path) + if err != nil { + os.Remove(path) + return err + } + + // Symlink target must be within the output directory otherwise it's an error. + if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") { + os.Remove(path) + return fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.", + outputSuffix, tgt) + } + return nil + }) + if err != nil { + return fmt.Errorf("While checking output symlinks: %v", err) + } + cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}} - manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger) + var m string + m, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger) + manifestText = manifestText + m if err != nil { return fmt.Errorf("While uploading output files: %v", err) } @@ -856,13 +1033,6 @@ func (runner *ContainerRunner) CaptureOutput() error { manifestText = rec.ManifestText } - // Pre-populate output from the configured mount points - var binds []string - for bind, _ := range runner.Container.Mounts { - binds = append(binds, bind) - } - sort.Strings(binds) - for _, bind := range binds { mnt := runner.Container.Mounts[bind] @@ -977,8 +1147,8 @@ func (runner *ContainerRunner) CommitLogs() error { // point, but re-open crunch log with ArvClient in case there are any // other further (such as failing to write the log to Keep!) while // shutting down - runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID, - "crunch-run", nil}) + runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil}) if runner.LogsPDH != nil { // If we have already assigned something to LogsPDH, @@ -1065,7 +1235,8 @@ func (runner *ContainerRunner) IsCancelled() bool { // NewArvLogWriter creates an ArvLogWriter func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { - return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")} + return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name, + writeCloser: runner.LogCollection.Open(name + ".txt")} } // Run the full container lifecycle. @@ -1099,6 +1270,10 @@ func (runner *ContainerRunner) Run() (err error) { if err == nil { err = e } + if runner.finalState == "Complete" { + // There was an error in the finalization. + runner.finalState = "Cancelled" + } } // Log the error encountered in Run(), if any @@ -1126,9 +1301,8 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Close() }() - err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container) + err = runner.fetchContainerRecord() if err != nil { - err = fmt.Errorf("While getting container record: %v", err) return } @@ -1191,6 +1365,24 @@ func (runner *ContainerRunner) Run() (err error) { return } +// Fetch the current container record (uuid = runner.Container.UUID) +// into runner.Container. +func (runner *ContainerRunner) fetchContainerRecord() error { + reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil) + if err != nil { + return fmt.Errorf("error fetching container record: %v", err) + } + defer reader.Close() + + dec := json.NewDecoder(reader) + dec.UseNumber() + err = dec.Decode(&runner.Container) + if err != nil { + return fmt.Errorf("error decoding container record: %v", err) + } + return nil +} + // NewContainerRunner creates a new container runner. func NewContainerRunner(api IArvadosClient, kc IKeepClient, @@ -1205,6 +1397,9 @@ func NewContainerRunner(api IArvadosClient, cr.Container.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) + + loadLogThrottleParams(api) + return cr }