X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a340487a7d406e73e51479a765a3d08bdb92b8d0..6a544620c4d5acb0c42cd56346f74454637904cb:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 4676a4f495..27bfa88730 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -49,7 +53,7 @@ var ErrCancelled = errors.New("Cancelled") // IKeepClient is the minimal Keep API methods used by crunch-run. type IKeepClient interface { PutHB(hash string, buf []byte) (string, int, error) - ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) + ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) } // NewLogWriter is a factory function to create a new log writer. @@ -66,7 +70,7 @@ type ThinDockerClient interface { networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error ContainerStop(ctx context.Context, container string, timeout *time.Duration) error - ContainerWait(ctx context.Context, container string) (int64, error) + ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) @@ -100,8 +104,8 @@ func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container } // ContainerWait invokes dockerclient.Client.ContainerWait -func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) { - return proxy.Docker.ContainerWait(ctx, container) +func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) { + return proxy.Docker.ContainerWait(ctx, container, condition) } // ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw @@ -341,7 +345,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { needCertMount := true var binds []string - for bind, _ := range runner.Container.Mounts { + for bind := range runner.Container.Mounts { binds = append(binds, bind) } sort.Strings(binds) @@ -588,23 +592,23 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) { logger := log.New(w, "node-info", 0) commands := []infoCommand{ - infoCommand{ + { label: "Host Information", cmd: []string{"uname", "-a"}, }, - infoCommand{ + { label: "CPU Information", cmd: []string{"cat", "/proc/cpuinfo"}, }, - infoCommand{ + { label: "Memory Information", cmd: []string{"cat", "/proc/meminfo"}, }, - infoCommand{ + { label: "Disk Space", cmd: []string{"df", "-m", "/", os.TempDir()}, }, - infoCommand{ + { label: "Disk INodes", cmd: []string{"df", "-i", "/", os.TempDir()}, }, @@ -646,14 +650,11 @@ func (runner *ContainerRunner) LogContainerRecord() (err error) { return fmt.Errorf("While retrieving container record from the API server: %v", err) } defer reader.Close() - // Read the API server response as []byte - json_bytes, err := ioutil.ReadAll(reader) - if err != nil { - return fmt.Errorf("While reading container record API server response: %v", err) - } - // Decode the JSON []byte + + dec := json.NewDecoder(reader) + dec.UseNumber() var cr map[string]interface{} - if err = json.Unmarshal(json_bytes, &cr); err != nil { + if err = dec.Decode(&cr); err != nil { return fmt.Errorf("While decoding the container record JSON response: %v", err) } // Re-encode it using indentation to improve readability @@ -676,7 +677,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.CrunchLog.Print("Attaching container streams") // If stdin mount is provided, attach it to the docker container - var stdinRdr keepclient.Reader + var stdinRdr arvados.File var stdinJson []byte if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok { if stdinMnt.Kind == "collection" { @@ -800,11 +801,13 @@ func (runner *ContainerRunner) CreateContainer() error { runner.ContainerConfig.Volumes = runner.Volumes runner.HostConfig = dockercontainer.HostConfig{ - Binds: runner.Binds, - Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent), + Binds: runner.Binds, LogConfig: dockercontainer.LogConfig{ Type: "none", }, + Resources: dockercontainer.Resources{ + CgroupParent: runner.setCgroupParent, + }, } if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { @@ -862,21 +865,28 @@ func (runner *ContainerRunner) StartContainer() error { // WaitFinish waits for the container to terminate, capture the exit code, and // close the stdout/stderr logging. -func (runner *ContainerRunner) WaitFinish() error { +func (runner *ContainerRunner) WaitFinish() (err error) { runner.CrunchLog.Print("Waiting for container to finish") - waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID) + waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running") + + var waitBody dockercontainer.ContainerWaitOKBody + select { + case waitBody = <-waitOk: + case err = <-waitErr: + } + if err != nil { return fmt.Errorf("container wait: %v", err) } - runner.CrunchLog.Printf("Container exited with code: %v", waitDocker) - code := int(waitDocker) + runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode) + code := int(waitBody.StatusCode) runner.ExitCode = &code waitMount := runner.ArvMountExit select { - case err := <-waitMount: + case err = <-waitMount: runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err) waitMount = nil runner.stop() @@ -921,8 +931,10 @@ func (runner *ContainerRunner) CaptureOutput() error { // Pre-populate output from the configured mount points var binds []string - for bind, _ := range runner.Container.Mounts { - binds = append(binds, bind) + for bind, mnt := range runner.Container.Mounts { + if mnt.Kind == "collection" { + binds = append(binds, bind) + } } sort.Strings(binds) @@ -949,17 +961,16 @@ func (runner *ContainerRunner) CaptureOutput() error { return err } + // get path relative to output dir outputSuffix := path[len(runner.HostOutputDir):] if strings.HasPrefix(tgt, "/") { // go through mounts and try reverse map to collection reference for _, bind := range binds { mnt := runner.Container.Mounts[bind] - if tgt == bind || strings.HasPrefix(tgt, bind+"/") && mnt.Kind == "collection" { + if tgt == bind || strings.HasPrefix(tgt, bind+"/") { // get path relative to bind targetSuffix := tgt[len(bind):] - // get path relative to output dir - outputSuffix := path[len(runner.HostOutputDir):] // Copy mount and adjust the path to add path relative to the bind adjustedMount := mnt @@ -1290,9 +1301,8 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Close() }() - err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container) + err = runner.fetchContainerRecord() if err != nil { - err = fmt.Errorf("While getting container record: %v", err) return } @@ -1355,6 +1365,24 @@ func (runner *ContainerRunner) Run() (err error) { return } +// Fetch the current container record (uuid = runner.Container.UUID) +// into runner.Container. +func (runner *ContainerRunner) fetchContainerRecord() error { + reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil) + if err != nil { + return fmt.Errorf("error fetching container record: %v", err) + } + defer reader.Close() + + dec := json.NewDecoder(reader) + dec.UseNumber() + err = dec.Decode(&runner.Container) + if err != nil { + return fmt.Errorf("error decoding container record: %v", err) + } + return nil +} + // NewContainerRunner creates a new container runner. func NewContainerRunner(api IArvadosClient, kc IKeepClient,