+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
- ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
+ ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
}
// NewLogWriter is a factory function to create a new log writer.
networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
- ContainerWait(ctx context.Context, container string) (int64, error)
+ ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
}
// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) {
- return proxy.Docker.ContainerWait(ctx, container)
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
+ return proxy.Docker.ContainerWait(ctx, container, condition)
}
// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
needCertMount := true
var binds []string
- for bind, _ := range runner.Container.Mounts {
+ for bind := range runner.Container.Mounts {
binds = append(binds, bind)
}
sort.Strings(binds)
logger := log.New(w, "node-info", 0)
commands := []infoCommand{
- infoCommand{
+ {
label: "Host Information",
cmd: []string{"uname", "-a"},
},
- infoCommand{
+ {
label: "CPU Information",
cmd: []string{"cat", "/proc/cpuinfo"},
},
- infoCommand{
+ {
label: "Memory Information",
cmd: []string{"cat", "/proc/meminfo"},
},
- infoCommand{
+ {
label: "Disk Space",
cmd: []string{"df", "-m", "/", os.TempDir()},
},
- infoCommand{
+ {
label: "Disk INodes",
cmd: []string{"df", "-i", "/", os.TempDir()},
},
return fmt.Errorf("While retrieving container record from the API server: %v", err)
}
defer reader.Close()
- // Read the API server response as []byte
- json_bytes, err := ioutil.ReadAll(reader)
- if err != nil {
- return fmt.Errorf("While reading container record API server response: %v", err)
- }
- // Decode the JSON []byte
+
+ dec := json.NewDecoder(reader)
+ dec.UseNumber()
var cr map[string]interface{}
- if err = json.Unmarshal(json_bytes, &cr); err != nil {
+ if err = dec.Decode(&cr); err != nil {
return fmt.Errorf("While decoding the container record JSON response: %v", err)
}
// Re-encode it using indentation to improve readability
runner.CrunchLog.Print("Attaching container streams")
// If stdin mount is provided, attach it to the docker container
- var stdinRdr keepclient.Reader
+ var stdinRdr arvados.File
var stdinJson []byte
if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
if stdinMnt.Kind == "collection" {
if err != nil {
return nil, fmt.Errorf("While Stat on temp dir: %v", err)
}
- stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+ stdoutPath := filepath.Join(runner.HostOutputDir, subdirs)
err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
if err != nil {
return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
}
}
}
- stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath))
if err != nil {
return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
}
runner.ContainerConfig.Volumes = runner.Volumes
runner.HostConfig = dockercontainer.HostConfig{
- Binds: runner.Binds,
- Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+ Binds: runner.Binds,
LogConfig: dockercontainer.LogConfig{
Type: "none",
},
+ Resources: dockercontainer.Resources{
+ CgroupParent: runner.setCgroupParent,
+ },
}
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() error {
+func (runner *ContainerRunner) WaitFinish() (err error) {
runner.CrunchLog.Print("Waiting for container to finish")
- waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
+ waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+
+ var waitBody dockercontainer.ContainerWaitOKBody
+ select {
+ case waitBody = <-waitOk:
+ case err = <-waitErr:
+ }
+
if err != nil {
return fmt.Errorf("container wait: %v", err)
}
- runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
- code := int(waitDocker)
+ runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+ code := int(waitBody.StatusCode)
runner.ExitCode = &code
waitMount := runner.ArvMountExit
select {
- case err := <-waitMount:
+ case err = <-waitMount:
runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
waitMount = nil
runner.stop()
return fmt.Errorf("While checking host output path: %v", err)
}
+ // Pre-populate output from the configured mount points
+ var binds []string
+ for bind, mnt := range runner.Container.Mounts {
+ if mnt.Kind == "collection" {
+ binds = append(binds, bind)
+ }
+ }
+ sort.Strings(binds)
+
var manifestText string
collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
_, err = os.Stat(collectionMetafile)
if err != nil {
// Regular directory
+
+ // Find symlinks to arv-mounted files & dirs.
+ err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if info.Mode()&os.ModeSymlink == 0 {
+ return nil
+ }
+ // read link to get container internal path
+ // only support 1 level of symlinking here.
+ var tgt string
+ tgt, err = os.Readlink(path)
+ if err != nil {
+ return err
+ }
+
+ // get path relative to output dir
+ outputSuffix := path[len(runner.HostOutputDir):]
+
+ if strings.HasPrefix(tgt, "/") {
+ // go through mounts and try reverse map to collection reference
+ for _, bind := range binds {
+ mnt := runner.Container.Mounts[bind]
+ if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+ // get path relative to bind
+ targetSuffix := tgt[len(bind):]
+
+ // Copy mount and adjust the path to add path relative to the bind
+ adjustedMount := mnt
+ adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
+
+ // get manifest text
+ var m string
+ m, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
+ if err != nil {
+ return err
+ }
+ manifestText = manifestText + m
+ // delete symlink so WriteTree won't try to to dereference it.
+ os.Remove(path)
+ return nil
+ }
+ }
+ }
+
+ // Not a link to a mount. Must be dereferencible and
+ // point into the output directory.
+ tgt, err = filepath.EvalSymlinks(path)
+ if err != nil {
+ os.Remove(path)
+ return err
+ }
+
+ // Symlink target must be within the output directory otherwise it's an error.
+ if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
+ os.Remove(path)
+ return fmt.Errorf("Output directory symlink %q points to invalid location %q, must point to mount or output directory.",
+ outputSuffix, tgt)
+ }
+ return nil
+ })
+ if err != nil {
+ return fmt.Errorf("While checking output symlinks: %v", err)
+ }
+
cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
- manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+ var m string
+ m, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+ manifestText = manifestText + m
if err != nil {
return fmt.Errorf("While uploading output files: %v", err)
}
manifestText = rec.ManifestText
}
- // Pre-populate output from the configured mount points
- var binds []string
- for bind, _ := range runner.Container.Mounts {
- binds = append(binds, bind)
- }
- sort.Strings(binds)
-
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
runner.CrunchLog.Close()
}()
- err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
+ err = runner.fetchContainerRecord()
if err != nil {
- err = fmt.Errorf("While getting container record: %v", err)
return
}
return
}
+// Fetch the current container record (uuid = runner.Container.UUID)
+// into runner.Container.
+func (runner *ContainerRunner) fetchContainerRecord() error {
+ reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+ if err != nil {
+ return fmt.Errorf("error fetching container record: %v", err)
+ }
+ defer reader.Close()
+
+ dec := json.NewDecoder(reader)
+ dec.UseNumber()
+ err = dec.Decode(&runner.Container)
+ if err != nil {
+ return fmt.Errorf("error decoding container record: %v", err)
+ }
+ return nil
+}
+
// NewContainerRunner creates a new container runner.
func NewContainerRunner(api IArvadosClient,
kc IKeepClient,