X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b9b4502bcddeccd794614bf6979d643f9f350877..3f7bde601546dc898975fbe7d56957794985fe43:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 0793729172..810955c0da 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -49,7 +53,7 @@ var ErrCancelled = errors.New("Cancelled") // IKeepClient is the minimal Keep API methods used by crunch-run. type IKeepClient interface { PutHB(hash string, buf []byte) (string, int, error) - ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) + ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) } // NewLogWriter is a factory function to create a new log writer. @@ -66,7 +70,7 @@ type ThinDockerClient interface { networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error ContainerStop(ctx context.Context, container string, timeout *time.Duration) error - ContainerWait(ctx context.Context, container string) (int64, error) + ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) @@ -100,8 +104,8 @@ func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container } // ContainerWait invokes dockerclient.Client.ContainerWait -func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) { - return proxy.Docker.ContainerWait(ctx, container) +func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) { + return proxy.Docker.ContainerWait(ctx, container, condition) } // ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw @@ -676,7 +680,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.CrunchLog.Print("Attaching container streams") // If stdin mount is provided, attach it to the docker container - var stdinRdr keepclient.Reader + var stdinRdr arvados.File var stdinJson []byte if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok { if stdinMnt.Kind == "collection" { @@ -769,14 +773,14 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { if err != nil { return nil, fmt.Errorf("While Stat on temp dir: %v", err) } - stdoutPath := path.Join(runner.HostOutputDir, subdirs) + stdoutPath := filepath.Join(runner.HostOutputDir, subdirs) err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777) if err != nil { return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err) } } } - stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath)) + stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath)) if err != nil { return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err) } @@ -800,11 +804,13 @@ func (runner *ContainerRunner) CreateContainer() error { runner.ContainerConfig.Volumes = runner.Volumes runner.HostConfig = dockercontainer.HostConfig{ - Binds: runner.Binds, - Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent), + Binds: runner.Binds, LogConfig: dockercontainer.LogConfig{ Type: "none", }, + Resources: dockercontainer.Resources{ + CgroupParent: runner.setCgroupParent, + }, } if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { @@ -862,21 +868,28 @@ func (runner *ContainerRunner) StartContainer() error { // WaitFinish waits for the container to terminate, capture the exit code, and // close the stdout/stderr logging. -func (runner *ContainerRunner) WaitFinish() error { +func (runner *ContainerRunner) WaitFinish() (err error) { runner.CrunchLog.Print("Waiting for container to finish") - waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID) + waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running") + + var waitBody dockercontainer.ContainerWaitOKBody + select { + case waitBody = <-waitOk: + case err = <-waitErr: + } + if err != nil { return fmt.Errorf("container wait: %v", err) } - runner.CrunchLog.Printf("Container exited with code: %v", waitDocker) - code := int(waitDocker) + runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode) + code := int(waitBody.StatusCode) runner.ExitCode = &code waitMount := runner.ArvMountExit select { - case err := <-waitMount: + case err = <-waitMount: runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err) waitMount = nil runner.stop() @@ -919,14 +932,91 @@ func (runner *ContainerRunner) CaptureOutput() error { return fmt.Errorf("While checking host output path: %v", err) } + // Pre-populate output from the configured mount points + var binds []string + for bind, mnt := range runner.Container.Mounts { + if mnt.Kind == "collection" { + binds = append(binds, bind) + } + } + sort.Strings(binds) + var manifestText string collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir) _, err = os.Stat(collectionMetafile) if err != nil { // Regular directory + + // Find symlinks to arv-mounted files & dirs. + err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.Mode()&os.ModeSymlink == 0 { + return nil + } + // read link to get container internal path + // only support 1 level of symlinking here. + var tgt string + tgt, err = os.Readlink(path) + if err != nil { + return err + } + + // get path relative to output dir + outputSuffix := path[len(runner.HostOutputDir):] + + if strings.HasPrefix(tgt, "/") { + // go through mounts and try reverse map to collection reference + for _, bind := range binds { + mnt := runner.Container.Mounts[bind] + if tgt == bind || strings.HasPrefix(tgt, bind+"/") { + // get path relative to bind + targetSuffix := tgt[len(bind):] + + // Copy mount and adjust the path to add path relative to the bind + adjustedMount := mnt + adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix) + + // get manifest text + var m string + m, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix) + if err != nil { + return err + } + manifestText = manifestText + m + // delete symlink so WriteTree won't try to to dereference it. + os.Remove(path) + return nil + } + } + } + + // Not a link to a mount. Must be dereferencible and + // point into the output directory. + tgt, err = filepath.EvalSymlinks(path) + if err != nil { + os.Remove(path) + return err + } + + // Symlink target must be within the output directory otherwise it's an error. + if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") { + os.Remove(path) + return fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.", + outputSuffix, tgt) + } + return nil + }) + if err != nil { + return fmt.Errorf("While checking output symlinks: %v", err) + } + cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}} - manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger) + var m string + m, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger) + manifestText = manifestText + m if err != nil { return fmt.Errorf("While uploading output files: %v", err) } @@ -946,13 +1036,6 @@ func (runner *ContainerRunner) CaptureOutput() error { manifestText = rec.ManifestText } - // Pre-populate output from the configured mount points - var binds []string - for bind, _ := range runner.Container.Mounts { - binds = append(binds, bind) - } - sort.Strings(binds) - for _, bind := range binds { mnt := runner.Container.Mounts[bind] @@ -1190,6 +1273,10 @@ func (runner *ContainerRunner) Run() (err error) { if err == nil { err = e } + if runner.finalState == "Complete" { + // There was an error in the finalization. + runner.finalState = "Cancelled" + } } // Log the error encountered in Run(), if any